Permalink
Browse files

Separate disconnect from on_connection and also add on_destroy

Break on_connection event into on_connect and on_disconnect events to
provide more fine-graine control over the TCPConnection instance.

Add on_destroy event on TCPConnection for notifying its owner that the
instance is going to destroy; and the owner can now safely get
destructed.

Different semantics between on_disconnect and on_destroy is simple and
clear: on_disconnect event indicates the connection is closed and any
further data transmission should be stopped outright and subsequent
cleanup could be fulfilled internally; while on_destroy implies that the
TCPConnection instance is about to destroy, and you shouldn't perform
any operation on it but leave it to destruction.

One usage of on_destroy event is how chat-client safelty quits its main
loop.
  • Loading branch information...
kingsamchen committed Jan 20, 2019
1 parent 7e81266 commit d9e7bde28155b5bb2f144dd4c42083d95ef7b674
@@ -14,7 +14,11 @@ ChatClient::ChatClient(const ezio::SocketAddress& sockaddr)
: thread_("client-network"),
client_(thread_.event_loop(), sockaddr, "ChatClient")
{
client_.set_on_connection(std::bind(&ChatClient::OnConnection, this, _1));
client_.set_on_connect(std::bind(&ChatClient::OnConnection, this, _1));
client_.set_on_disconnect(std::bind(&ChatClient::OnConnection, this, _1));
client_.set_on_connection_destroy([this](const auto&) {
main_loop_.Quit();
});
client_.set_on_message(std::bind(&DataCodec::OnDataReceive, &codec_, _1, _2, _3));

codec_.set_on_message(std::bind(&ChatClient::OnMessage, this, _1, _2, _3));
@@ -57,10 +61,6 @@ void ChatClient::OnConnection(const ezio::TCPConnectionPtr& conn)
} else {
conn_ = nullptr;
printf("Bye!\n");
// HACK: ensure calling dtor of `client_` after handling disconnection.
// TODO: Use a more proper way to eliminate this hack.
main_loop_.RunTaskAfter(std::bind(&ezio::EventLoop::Quit, &main_loop_),
std::chrono::seconds(2));
}
}

@@ -17,7 +17,8 @@ using namespace std::placeholders;
ChatServer::ChatServer(unsigned short port)
: srv_(&loop_, ezio::SocketAddress(port), "ChatServer")
{
srv_.set_on_connection(std::bind(&ChatServer::OnConnection, this, _1));
srv_.set_on_connect(std::bind(&ChatServer::OnConnection, this, _1));
srv_.set_on_disconnect(std::bind(&ChatServer::OnConnection, this, _1));
srv_.set_on_message(std::bind(&DataCodec::OnDataReceive, &codec_, _1, _2, _3));

codec_.set_on_command(std::bind(&ChatServer::OnCommand, this, _1, _2, _3));
@@ -22,7 +22,8 @@ class EchoClient {
explicit EchoClient(const ezio::SocketAddress& addr)
: tcp_client_(&io_loop_, addr, "EchoClient")
{
tcp_client_.set_on_connection(std::bind(&EchoClient::OnConnection, this, _1));
tcp_client_.set_on_connect(std::bind(&EchoClient::OnConnection, this, _1));
tcp_client_.set_on_disconnect(std::bind(&EchoClient::OnConnection, this, _1));
tcp_client_.set_on_message(std::bind(&EchoClient::OnReceiveMessage, this, _1, _2, _3));
}

@@ -39,12 +40,15 @@ class EchoClient {
}

private:
void ReadUserInput() const
void ReadUserInput()
{
ENSURE(CHECK, !!conn_).Require();

std::string s;
std::getline(std::cin, s, '\n');
if (!std::getline(std::cin, s, '\n')) {
tcp_client_.Disconnect();
return;
}

conn_->Send(s);
}
@@ -58,13 +62,13 @@ class EchoClient {
conn_ = conn;
ReadUserInput();
} else {
LOG(INFO) << "Server is down due to unknown causes";
LOG(INFO) << "Disconnect from the server";
conn_ = nullptr;
io_loop_.Quit();
}
}

void OnReceiveMessage(const ezio::TCPConnectionPtr&, ezio::Buffer& buf, ezio::TimePoint) const
void OnReceiveMessage(const ezio::TCPConnectionPtr&, ezio::Buffer& buf, ezio::TimePoint)
{
std::cout << buf.ReadAllAsString() << "\n";
ReadUserInput();
@@ -25,7 +25,8 @@ class EchoServer {
connections_(0),
received_messages_(0)
{
tcp_srv_.set_on_connection(std::bind(&EchoServer::OnConnection, this, _1));
tcp_srv_.set_on_connect(std::bind(&EchoServer::OnConnection, this, _1));
tcp_srv_.set_on_disconnect(std::bind(&EchoServer::OnConnection, this, _1));
tcp_srv_.set_on_message(std::bind(&EchoServer::OnMessage, this, _1, _2, _3));
io_loop_.RunTaskEvery(std::bind(&EchoServer::DumpServerStatus, this),
std::chrono::seconds(5));
@@ -59,14 +60,13 @@ class EchoServer {
if (conn->connected()) {
action = "connected";
connections_.fetch_add(1);
conn->SetTCPNoDelay(true);
} else {
action = "disconnected";
connections_.fetch_sub(1);
}

LOG(INFO) << conn->name() << " at " << conn->peer_addr().ToHostPort() << " is " << action;

conn->SetTCPNoDelay(true);
}

void OnMessage(const ezio::TCPConnectionPtr& conn, ezio::Buffer& buf, ezio::TimePoint)
@@ -18,8 +18,9 @@ class TCPConnection;
using TCPConnectionPtr = std::shared_ptr<TCPConnection>;

using ConnectionEventHandler = std::function<void(const TCPConnectionPtr&)>;
using MessageEventHandler = std::function<void(const TCPConnectionPtr&, Buffer&, TimePoint)>;
using CloseEventHandler = std::function<void(const TCPConnectionPtr&)>;
using DestroyEventHandler = std::function<void(const TCPConnectionPtr&)>;
using MessageEventHandler = std::function<void(const TCPConnectionPtr&, Buffer&, TimePoint)>;

} // namespace ezio

@@ -11,6 +11,13 @@

#include "ezio/event_loop.h"

namespace {

void OnConnectionDestroyDefault(const ezio::TCPConnectionPtr&)
{}

} // namespace

namespace ezio {

using namespace std::placeholders;
@@ -22,6 +29,7 @@ TCPClient::TCPClient(EventLoop* loop, const SocketAddress& remote_addr, std::str
next_conn_id_(0),
auto_reconnect_(false),
connector_(MakeConnector(loop, remote_addr)),
on_connection_destroy_(&OnConnectionDestroyDefault),
alive_token_(std::make_shared<AliveToken>())
{
connector_->set_on_new_connection(std::bind(&TCPClient::HandleConnection, this, _1, _2));
@@ -114,9 +122,12 @@ void TCPClient::HandleConnection(ScopedSocket&& sock, const SocketAddress& local
local_addr,
peer_addr);

conn->set_on_connection(on_connection_);
conn->set_on_message(on_message_);
conn->set_on_connect(on_connect_);
conn->set_on_disconnect(on_disconnect_);
conn->set_on_close(std::bind(&TCPClient::HandleDisconnection, this, _1));
conn->set_on_destroy(on_connection_destroy_);

conn->set_on_message(on_message_);

{
std::lock_guard<std::mutex> lock(conn_mutex_);
@@ -75,9 +75,19 @@ class TCPClient {
auto_reconnect_ = enable;
}

void set_on_connection(ConnectionEventHandler handler)
void set_on_connect(ConnectionEventHandler handler)
{
on_connection_ = std::move(handler);
on_connect_ = std::move(handler);
}

void set_on_disconnect(ConnectionEventHandler handler)
{
on_disconnect_ = std::move(handler);
}

void set_on_connection_destroy(DestroyEventHandler handler)
{
on_connection_destroy_ = std::move(handler);
}

void set_on_message(MessageEventHandler handler)
@@ -99,7 +109,10 @@ class TCPClient {
bool auto_reconnect_;
std::unique_ptr<Connector> connector_;

ConnectionEventHandler on_connection_;
ConnectionEventHandler on_connect_;
ConnectionEventHandler on_disconnect_;
DestroyEventHandler on_connection_destroy_;

MessageEventHandler on_message_;

mutable std::mutex conn_mutex_;
@@ -79,7 +79,7 @@ void TCPConnection::MakeEstablished()
conn_notifier_.WeaklyBind(shared_from_this());
conn_notifier_.EnableReading();

on_connection_(shared_from_this());
on_connect_(shared_from_this());

#if defined(OS_WIN)
ENSURE(CHECK, io_reqs_.read_req.IsProbing())(io_reqs_.read_req.events).Require();
@@ -94,10 +94,13 @@ void TCPConnection::MakeTeardown()
auto running_state = State::Connected;
if (state_.compare_exchange_strong(running_state, State::Disconnected,
std::memory_order_acq_rel, std::memory_order_relaxed)) {
on_disconnect_(shared_from_this());

conn_notifier_.DisableAll();
on_connection_(shared_from_this());
}

on_destroy_(shared_from_this());

conn_notifier_.Detach();
}

@@ -147,7 +150,7 @@ void TCPConnection::HandleClose()

TCPConnectionPtr conn(shared_from_this());

on_connection_(conn);
on_disconnect_(conn);
on_close_(conn);
}

@@ -87,21 +87,31 @@ class TCPConnection : public std::enable_shared_from_this<TCPConnection> {
return peer_addr_;
}

void set_on_connection(ConnectionEventHandler handler)
void set_on_connect(ConnectionEventHandler handler)
{
on_connection_ = std::move(handler);
on_connect_ = std::move(handler);
}

void set_on_message(MessageEventHandler handler)
{
on_message_ = std::move(handler);
}

void set_on_disconnect(ConnectionEventHandler handler)
{
on_disconnect_ = std::move(handler);
}

void set_on_close(CloseEventHandler handler)
{
on_close_ = std::move(handler);
}

void set_on_destroy(DestroyEventHandler handler)
{
on_destroy_ = std::move(handler);
}

private:
State state() const noexcept
{
@@ -165,9 +175,11 @@ class TCPConnection : public std::enable_shared_from_this<TCPConnection> {
IORequests io_reqs_;
#endif

ConnectionEventHandler on_connection_;
ConnectionEventHandler on_connect_;
MessageEventHandler on_message_;
ConnectionEventHandler on_disconnect_;
CloseEventHandler on_close_;
DestroyEventHandler on_destroy_;
};

using TCPConnectionPtr = std::shared_ptr<TCPConnection>;
@@ -11,6 +11,13 @@

#include "ezio/event_loop.h"

namespace {

void OnConnectionDestroyDefault(const ezio::TCPConnectionPtr&)
{}

} // namespace

namespace ezio {

using namespace std::placeholders;
@@ -21,7 +28,8 @@ TCPServer::TCPServer(ezio::EventLoop* loop, const SocketAddress& addr, std::stri
name_(std::move(name)),
started_(false),
acceptor_(loop, addr),
next_conn_id_(0)
next_conn_id_(0),
on_connection_destroy_(&OnConnectionDestroyDefault)
{
acceptor_.set_on_new_connection(std::bind(&TCPServer::HandleNewConnection, this, _1, _2));
}
@@ -77,11 +85,14 @@ void TCPServer::HandleNewConnection(ScopedSocket&& conn_sock, const SocketAddres

connections_.insert({conn->name(), conn});

conn->set_on_connection(on_connection_);
conn->set_on_message(on_message_);
conn->set_on_connect(on_connect_);
conn->set_on_disconnect(on_disconnect_);
conn->set_on_close([this](const TCPConnectionPtr& conn_ptr) {
loop_->RunTask(std::bind(&TCPServer::RemoveConnection, this, conn_ptr));
});
conn->set_on_destroy(on_connection_destroy_);

conn->set_on_message(on_message_);

conn_loop->RunTask(std::bind(&TCPConnection::MakeEstablished, conn));
}
@@ -60,9 +60,19 @@ class TCPServer {
return listen_addr_.ToHostPort();
}

void set_on_connection(ConnectionEventHandler handler)
void set_on_connect(ConnectionEventHandler handler)
{
on_connection_ = std::move(handler);
on_connect_ = std::move(handler);
}

void set_on_disconnect(ConnectionEventHandler handler)
{
on_disconnect_ = std::move(handler);
}

void set_on_connection_destroy(DestroyEventHandler handler)
{
on_connection_destroy_ = std::move(handler);
}

void set_on_message(MessageEventHandler handler)
@@ -92,7 +102,10 @@ class TCPServer {

ConnectionMap connections_;

ConnectionEventHandler on_connection_;
ConnectionEventHandler on_connect_;
ConnectionEventHandler on_disconnect_;
DestroyEventHandler on_connection_destroy_;

MessageEventHandler on_message_;
};

Oops, something went wrong.

0 comments on commit d9e7bde

Please sign in to comment.