Browse files

use internal destruction instead of public cancel() for destroying me…

…ssages.

that will simplify the public interface.
  • Loading branch information...
1 parent 6fbb50a commit 97dce619acc9c9d163de26f08e00bf4e9df0dcb8 @jinmei committed Mar 31, 2012
View
122 src/lib/asio_message_manager.cc
@@ -22,6 +22,7 @@
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/lexical_cast.hpp>
+#include <memory>
#include <string>
#include <iostream>
@@ -35,13 +36,26 @@ using boost::lexical_cast;
namespace Queryperf {
-class UDPMessageSocket : public ASIOMessageSocket {
+class ASIOMessageSocket::ASIOMessageSocketImpl {
+protected:
+ ASIOMessageSocketImpl() {}
+public:
+ virtual ~ASIOMessageSocketImpl() {}
+ virtual void send(const void* data, size_t datalen) = 0;
+ virtual void cancel() = 0;
+ virtual int native() = 0;
+};
+
+namespace {
+class UDPMessageSocket : public ASIOMessageSocket::ASIOMessageSocketImpl {
public:
UDPMessageSocket(io_service& io_service, const string& address,
uint16_t port, void* recvbuf, size_t recvbuf_len,
- Callback callback);
+ MessageSocket::Callback callback);
virtual void send(const void* data, size_t datalen);
- virtual void cancel();
+ virtual void cancel() { // in our simplified usage, this is enough
+ delete this;
+ }
virtual int native() { return (asio_sock_.native()); }
@@ -51,7 +65,7 @@ class UDPMessageSocket : public ASIOMessageSocket {
private:
udp::socket asio_sock_;
- Callback callback_;
+ MessageSocket::Callback callback_;
bool receiving_;
void* recvbuf_;
size_t recvbuf_len_;
@@ -60,7 +74,7 @@ class UDPMessageSocket : public ASIOMessageSocket {
UDPMessageSocket::UDPMessageSocket(io_service& io_service,
const string& address, uint16_t port,
void* recvbuf, size_t recvbuf_len,
- Callback callback) :
+ MessageSocket::Callback callback) :
asio_sock_(io_service), callback_(callback), receiving_(false),
recvbuf_(recvbuf), recvbuf_len_(recvbuf_len)
{
@@ -96,32 +110,33 @@ UDPMessageSocket::send(const void* data, size_t datalen) {
}
void
-UDPMessageSocket::cancel() {
- // In our usage we don't need this. We can postpone implementing it.
- throw MessageSocketError("cancel on UDP socket is not supported yet");
-}
-
-void
UDPMessageSocket::handleRead(const asio::error_code& ec, size_t length) {
if (ec) {
throw MessageSocketError("unexpected failure on socket read: " +
ec.message());
}
- callback_(Event(recvbuf_, length));
+ callback_(MessageSocket::Event(recvbuf_, length));
asio_sock_.async_receive(asio::buffer(recvbuf_, recvbuf_len_),
boost::bind(&UDPMessageSocket::handleRead,
this, _1, _2));
}
-class TCPMessageSocket : public ASIOMessageSocket {
+class TCPMessageSocket : public ASIOMessageSocket::ASIOMessageSocketImpl {
public:
TCPMessageSocket(ASIOMessageManager* manager,
io_service& io_service, const string& address,
uint16_t port, void* recvbuf, size_t recvbuf_len,
- Callback callback);
+ MessageSocket::Callback callback);
virtual void send(const void* data, size_t datalen);
virtual void cancel();
virtual int native() { return (asio_sock_.native()); }
+
+private:
+ void handleConnect(const asio::error_code& ec);
+ void handleWrite(const asio::error_code& ec, size_t length);
+ void handleReadLength(const asio::error_code& ec, size_t length);
+ void handleReadData(const asio::error_code& ec, size_t length);
+
bool cancelCheck(const asio::error_code& ec) {
if (!cancelled_) {
return (false);
@@ -132,36 +147,36 @@ class TCPMessageSocket : public ASIOMessageSocket {
return (true);
}
-private:
- void handleConnect(const asio::error_code& ec);
- void handleWrite(const asio::error_code& ec, size_t length);
- void handleReadLength(const asio::error_code& ec, size_t length);
- void handleReadData(const asio::error_code& ec, size_t length);
+ void sendCallback(const void* callback_data, size_t data_len) {
+ completed_ = true;
+ callback_(MessageSocket::Event(callback_data, data_len));
+ }
private:
ASIOMessageManager* manager_;
tcp::socket asio_sock_;
asio::error_code asio_error_; // placeholder for getting ASIO error
tcp::endpoint dest_;
- Callback callback_;
+ MessageSocket::Callback callback_;
void* recvbuf_; // for the first message
size_t recvbuf_len_; // available size of recvbuf_
size_t recvdata_len_; // actual message length of the first message
uint8_t aux_recvbuf_[65535]; // placeholder for subsequent messages
uint8_t msglen_placeholder_[2];
boost::array<asio::const_buffer, 2> sendbufs_;
bool cancelled_;
+ bool completed_;
};
TCPMessageSocket::TCPMessageSocket(ASIOMessageManager* manager,
io_service& io_service,
const string& address, uint16_t port,
void* recvbuf, size_t recvbuf_len,
- Callback callback) :
+ MessageSocket::Callback callback) :
manager_(manager), asio_sock_(io_service),
dest_(asio::ip::address::from_string(address), port),
callback_(callback), recvbuf_(recvbuf), recvbuf_len_(recvbuf_len),
- recvdata_len_(0), cancelled_(false)
+ recvdata_len_(0), cancelled_(false), completed_(false)
{
// Note: we don't even open the socket yet.
}
@@ -180,9 +195,16 @@ TCPMessageSocket::send(const void* data, size_t datalen) {
void
TCPMessageSocket::cancel() {
- assert(!cancelled_);
- asio_sock_.cancel();
- cancelled_ = true;
+ if (asio_sock_.is_open() && !completed_) {
+ // Initiate delayed abort.
+ assert(!cancelled_);
+ asio_sock_.cancel();
+ cancelled_ = true;
+ } else {
+ // If it's not even opened yet, there's nothing to do. Just kill
+ // itself.
+ delete this;
+ }
}
void
@@ -192,7 +214,7 @@ TCPMessageSocket::handleConnect(const asio::error_code& ec) {
}
if (ec) {
cerr << "[Warn] TCP connect failed: " << ec.message() << endl;
- callback_(Event(NULL, 0));
+ sendCallback(NULL, 0);
return;
}
asio::async_write(asio_sock_, sendbufs_,
@@ -207,7 +229,7 @@ TCPMessageSocket::handleWrite(const asio::error_code& ec, size_t) {
}
if (ec) {
cerr << "[Warn] TCP send failed: " << ec.message() << endl;
- callback_(Event(NULL, 0));
+ sendCallback(NULL, 0);
return;
}
// Immediately after sending the query, shutdown the outbound direction
@@ -216,7 +238,7 @@ TCPMessageSocket::handleWrite(const asio::error_code& ec, size_t) {
if (asio_error_) {
cerr << "[Warn] failed to shut down TCP socket: "
<< asio_error_.message() << endl;
- callback_(Event(NULL, 0));
+ sendCallback(NULL, 0);
return;
}
@@ -236,13 +258,13 @@ TCPMessageSocket::handleReadLength(const asio::error_code& ec, size_t length) {
// We've received all messages. Note that this includes the case
// where the server closes the connection without sending any message
// or with partial message.
- callback_(Event(recvbuf_, recvdata_len_));
+ sendCallback(recvbuf_, recvdata_len_);
return;
}
if (ec) {
cerr << "[Warn] failed to read TCP message length: "
<< ec.message() << endl;
- callback_(Event(NULL, 0));
+ sendCallback(NULL, 0);
return;
}
if (length != sizeof(msglen_placeholder_)) {
@@ -271,12 +293,12 @@ TCPMessageSocket::handleReadData(const asio::error_code& ec, size_t length) {
// We've received all messages. This is an unexpected connection
// termination by the server. Do the callback with what we've had
// so far anyway.
- callback_(Event(recvbuf_, recvdata_len_));
+ sendCallback(recvbuf_, recvdata_len_);
return;
}
if (ec) {
cerr << "[Warn] failed to read TCP message: " << ec.message() << endl;
- callback_(Event(NULL, recvdata_len_));
+ sendCallback(NULL, recvdata_len_);
return;
}
// If this is the first message, remember its length.
@@ -293,6 +315,24 @@ TCPMessageSocket::handleReadData(const asio::error_code& ec, size_t length) {
boost::bind(&TCPMessageSocket::handleReadLength,
this, _1, _2));
}
+} // end of unnamed namespace
+
+ASIOMessageSocket::~ASIOMessageSocket() {
+ // The ownership is being released from the caller. The underlying
+ // impl object will be responsible for destructing itself.
+ impl_->cancel();
+ impl_ = NULL; // not necessary, but just in case
+}
+
+void
+ASIOMessageSocket::send(const void* data, size_t datalen) {
+ impl_->send(data, datalen);
+}
+
+int
+ASIOMessageSocket::native() {
+ return (impl_->native());
+}
struct ASIOMessageManager::ASIOMessageManagerImpl {
io_service io_service_;
@@ -312,18 +352,28 @@ ASIOMessageManager::createMessageSocket(int proto, const string& address,
void* recvbuf, size_t recvbuf_len,
MessageSocket::Callback callback)
{
+ MessageSocket* ret;
+
if (!callback) {
throw MessageSocketError("null socket callback specified");
}
if (proto == IPPROTO_UDP) {
- return (new UDPMessageSocket(impl_->io_service_, address, port,
- recvbuf, recvbuf_len, callback));
+ auto_ptr<UDPMessageSocket> impl_p(
+ new UDPMessageSocket(impl_->io_service_, address, port,
+ recvbuf, recvbuf_len, callback));
+ ret = new ASIOMessageSocket(impl_p.get());
+ impl_p.release();
+ return (ret);
} else if (proto == IPPROTO_TCP) {
if (recvbuf_len < 65535) { // must be able to hold a full TCP msg
throw MessageSocketError("Insufficient TCP receive buffer");
}
- return (new TCPMessageSocket(this, impl_->io_service_, address, port,
- recvbuf, recvbuf_len, callback));
+ auto_ptr<TCPMessageSocket> impl_p(
+ new TCPMessageSocket(this, impl_->io_service_, address, port,
+ recvbuf, recvbuf_len, callback));
+ ret = new ASIOMessageSocket(impl_p.get());
+ impl_p.release();
+ return (ret);
}
throw MessageSocketError("unsupported or invalid protocol: " +
lexical_cast<string>(proto));
View
16 src/lib/asio_message_manager.h
@@ -27,17 +27,21 @@
namespace Queryperf {
class ASIOMessageSocket : public MessageSocket {
-protected:
- ASIOMessageSocket() {}
public:
- virtual ~ASIOMessageSocket() {}
- virtual void send(const void* data, size_t datalen) = 0;
- virtual void cancel() = 0;
+ // The existence of this class needs to be public for the convenience of
+ // the implementation.
+ class ASIOMessageSocketImpl;
+private:
+ ASIOMessageSocketImpl* impl_;
+public:
+ ASIOMessageSocket(ASIOMessageSocketImpl* impl) : impl_(impl) {}
+ virtual ~ASIOMessageSocket();
+ virtual void send(const void* data, size_t datalen);
/// \brief Return the native socket descriptor.
///
/// Provided for debugging purposes only.
- virtual int native() = 0;
+ int native();
};
class ASIOMessageManager : public MessageManager {
View
5 src/lib/dispatcher.cc
@@ -101,10 +101,7 @@ class QueryEvent {
void queryTimerCallback() {
cout << "[Timeout] Query timed out: msg id: " << qid_ << endl;
if (tcp_sock_ != NULL) {
- // Cancel the operation, and release the ownership. The socket
- // will destruct itself.
- tcp_sock_->cancel();
- tcp_sock_ = NULL;
+ clearTCPSocket();
}
restart_callback_(qid_, NULL);
}
View
6 src/lib/message_manager.h
@@ -60,12 +60,6 @@ class MessageSocket : private boost::noncopyable {
virtual ~MessageSocket() {}
virtual void send(const void* data, size_t datalen) = 0;
-
- /// \brief Cancel any pending socket operation.
- ///
- /// This means the ownership is released from the caller. The object
- /// is responsible for destructing itself.
- virtual void cancel() = 0;
};
/// \brief Timers that work with a \c MessageManager.
View
5 src/lib/tests/test_message_manager.cc
@@ -55,11 +55,6 @@ TestMessageSocket::send(const void* data, size_t datalen) {
}
void
-TestMessageSocket::cancel() {
- delete this;
-}
-
-void
TestMessageTimer::start(const boost::posix_time::time_duration& duration) {
++n_started_;
duration_seconds_ = duration.seconds();
View
1 src/lib/tests/test_message_manager.h
@@ -45,7 +45,6 @@ class TestMessageSocket : public MessageSocket {
{}
~TestMessageSocket();
virtual void send(const void* data, size_t datalen);
- virtual void cancel();
std::vector<boost::shared_ptr<isc::dns::Message> > queries_;
Callback callback_;

0 comments on commit 97dce61

Please sign in to comment.