Permalink
Browse files

Fix problem with connection pool deadlocks

  • Loading branch information...
1 parent a63a572 commit d6afc5b9770e25111d9142066ad5f656d8d58e56 @readams readams committed Jun 19, 2009
@@ -40,6 +40,15 @@ Connection::Connection(string& hostName,
: config(conf), host(hostName), port(portNum), io_service(),
resolver(io_service), timer(io_service), socket(io_service),
connbuf(NULL), connstream(NULL), active(false) {
+}
+
+Connection::~Connection() {
+ close();
+ if (connstream) delete connstream;
+ if (connbuf) delete connbuf;
+}
+
+void Connection::connect() {
// Start an asynchronous resolve to translate the server and service names
// into a list of endpoints.
tcp::resolver::query query(host, port);
@@ -48,13 +57,7 @@ Connection::Connection(string& hostName,
asio::placeholders::error,
asio::placeholders::iterator));
- wait_for_operation(conf->getConnectionTimeoutMs());
-}
-
-Connection::~Connection() {
- close();
- if (connstream) delete connstream;
- if (connbuf) delete connbuf;
+ wait_for_operation(config->getConnectionTimeoutMs());
}
void Connection::check_error(const system::error_code& err) {
@@ -64,10 +67,6 @@ void Connection::check_error(const system::error_code& err) {
}
}
-static void set_result(optional<system::error_code>* a, system::error_code b) {
- a->reset(b);
-}
-
void Connection::wait_for_operation(long millis)
{
op_timeout = false;
@@ -161,12 +160,14 @@ size_t Connection::read_some(char* buffer, size_t bufferLen) {
return (size_t)bytes;
}
+#if 0
void Connection::handle_data_op(const system::error_code& err,
size_t transferred) {
check_error(err);
bytesTransferred = transferred;
op_complete = true;
}
+#endif
size_t Connection::write(const char* buffer, size_t bufferLen) {
#if 0
@@ -45,52 +45,66 @@ shared_ptr<Connection>& ConnectionPool::checkout(string& host, int port) {
stringstream hostKey;
hostKey << host << ":" << port;
- host_entry_ptr& hep = pool[hostKey.str()];
+ shared_ptr<Connection>* connRet = NULL;
{
- lock_guard<mutex> guard(poolMutex);
+ // Ensure that we don't have too many connections in the pool
+ unique_lock<mutex> lock(poolMutex);
+
+ host_entry_ptr& hep = pool[hostKey.str()];
+ int& host_ready_count = ready_count[hostKey.str()];
if (!hep.get()) {
hep = host_entry_ptr(new host_entry());
+ host_ready_count = 0;
}
- // Search for an active connection to try
- host_entry::iterator heit;
- for (heit = hep->begin(); heit != hep->end(); ++heit) {
- if (heit->second.first == STATUS_READY) {
- heit->second.first = STATUS_CHECKED_OUT;
- return heit->second.second;
- }
+ while ((host_ready_count == 0) &&
+ (((clientConfig->getMaxConnectionsPerNode() > 0) &&
+ (hep->size() >= (size_t)clientConfig->getMaxConnectionsPerNode())) ||
+ ((clientConfig->getMaxTotalConnections() > 0) &&
+ (totalConnections >= clientConfig->getMaxTotalConnections())))) {
+ checkinCond.wait(lock);
}
- }
- // Fall back to creating a new connection
- {
- // Ensure that we don't have too many connections in the pool
- unique_lock<mutex> lock(poolMutex);
-
- while (((clientConfig->getMaxConnectionsPerNode() > 0) &&
- (hep->size() >= (size_t)clientConfig->getMaxConnectionsPerNode())) ||
- ((clientConfig->getMaxTotalConnections() > 0) &&
- (totalConnections >= clientConfig->getMaxTotalConnections()))) {
- checkinCond.wait(lock);
+ // Search for a ready connection to try
+ if (host_ready_count > 0) {
+ host_entry::iterator heit;
+ for (heit = hep->begin(); heit != hep->end(); ++heit) {
+ if (heit->second.first == STATUS_READY) {
+ heit->second.first = STATUS_CHECKED_OUT;
+ host_ready_count -= 1;
+ return heit->second.second;
+ }
+ }
}
+
+ // Otherwise fall back to creating a new connection
+
+ // Reserve a spot in the pool
+ stringstream portStr;
+ portStr << port;
+ std::string portString = portStr.str();
+ //cout << "Creating connection " << hostKey.str() << endl;
+ shared_ptr<Connection> conn(new Connection(host, portString,
+ clientConfig));
+ (*hep)[(size_t)conn.get()] = make_pair((int)STATUS_UNINIT, conn);
+ totalConnections += 1;
+ connRet = &(((*hep)[(size_t)conn.get()]).second);
}
- stringstream portStr;
- portStr << port;
- std::string portString = portStr.str();
- // cout << "Allocating new Connection " << hostKey.str() << endl;
+ // Connect to the remote host
+ try {
+ (*connRet)->connect();
+ } catch (...) {
+ checkin(*connRet);
+ throw;
+ }
- shared_ptr<Connection> conn(new Connection(host, portString,
- clientConfig));
- shared_ptr<Connection>* connRet = NULL;
{
- lock_guard<mutex> guard(poolMutex);
-
- (*hep)[(size_t)conn.get()] = make_pair((int)STATUS_CHECKED_OUT, conn);
- totalConnections += 1;
- connRet = &(((*hep)[(size_t)conn.get()]).second);
+ lock_guard<mutex> guard(poolMutex);
+ (*pool[hostKey.str()])[(size_t)connRet->get()].first =
+ (int)STATUS_CHECKED_OUT;
}
return *connRet;
@@ -113,7 +127,10 @@ void ConnectionPool::checkin(shared_ptr<Connection>& conn) {
if (ce.first != STATUS_CHECKED_OUT || !conn->is_active()) {
/* Something horrible has happened to our connection */
hep->erase((size_t)conn.get());
+ totalConnections -= 1;
+ //cout << "Destroying connection " << hostKey << endl;
} else {
+ ready_count[hostKey] += 1;
ce.first = STATUS_READY;
}
}
@@ -55,6 +55,11 @@ class Connection {
~Connection();
/**
+ * Connect to the remote host
+ */
+ void connect();
+
+ /**
* Close the underlying connection. After calling this the
* Connection object cannot be used.
*/
@@ -117,8 +122,10 @@ class Connection {
void handle_resolve(const system::error_code& err,
tcp::resolver::iterator endpoint_iterator);
+#if 0
void handle_data_op(const system::error_code& err,
size_t transferred);
+#endif
void check_error(const system::error_code& err);
shared_ptr<ClientConfig> config;
@@ -79,8 +79,10 @@ class ConnectionPool
typedef map<size_t, conn_entry> host_entry;
typedef shared_ptr<host_entry> host_entry_ptr;
typedef map<string, host_entry_ptr> conn_pool;
+ typedef map<string, int> conn_pool_ready_count;
conn_pool pool;
+ conn_pool_ready_count ready_count;
int totalConnections;
mutex poolMutex;

0 comments on commit d6afc5b

Please sign in to comment.