diff --git a/include/cassandra.h b/include/cassandra.h
index d08f64b69..49e9c9ae0 100644
--- a/include/cassandra.h
+++ b/include/cassandra.h
@@ -1974,6 +1974,23 @@ CASS_EXPORT CASS_DEPRECATED(CassError
cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
unsigned num_requests));
+/**
+ * Alters the strategy of picking connections from connection pool for a given host.
+ * By default the connection chosen is the one with least pending requests.
+ * When this function is called, then usual round-robin will used.
+ *
+ * This should not be confused with load balancing while doing node selection. What is
+ * being load-balanced here are the connections to a single node.
+ *
+ * Default: cass_false (no round-robin)
+ *
+ * @public @memberof CassCluster
+ *
+ * @param[in] cluster
+ */
+CASS_EXPORT void
+cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster);
+
/**
* Sets the timeout for connecting to a node.
*
diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp
index 9fac5b326..cfa303354 100644
--- a/src/cluster_config.cpp
+++ b/src/cluster_config.cpp
@@ -233,6 +233,10 @@ CassError cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
return CASS_OK;
}
+void cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster) {
+ cluster->config().set_connection_pool_round_robin(true);
+}
+
void cass_cluster_set_connect_timeout(CassCluster* cluster, unsigned timeout_ms) {
cluster->config().set_connect_timeout(timeout_ms);
}
diff --git a/src/config.hpp b/src/config.hpp
index 6d34cd6b2..8517c4b58 100644
--- a/src/config.hpp
+++ b/src/config.hpp
@@ -74,7 +74,8 @@ class Config {
, is_client_id_set_(false)
, host_listener_(new DefaultHostListener())
, monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS)
- , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) {
+ , cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory())
+ , connection_pool_round_robin_(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {
profiles_.set_empty_key(String());
// Assign the defaults to the cluster profile
@@ -392,6 +393,11 @@ class Config {
}
}
+ bool connection_pool_round_robin() const { return connection_pool_round_robin_; }
+ void set_connection_pool_round_robin(bool new_val) {
+ connection_pool_round_robin_ = new_val;
+ }
+
private:
void init_profiles();
@@ -441,6 +447,7 @@ class Config {
unsigned monitor_reporting_interval_secs_;
CloudSecureConnectionConfig cloud_secure_connection_config_;
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_;
+ bool connection_pool_round_robin_;
};
}}} // namespace datastax::internal::core
diff --git a/src/connection_pool.cpp b/src/connection_pool.cpp
index d22d73c11..fdf7c4103 100644
--- a/src/connection_pool.cpp
+++ b/src/connection_pool.cpp
@@ -39,12 +39,14 @@ static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledC
ConnectionPoolSettings::ConnectionPoolSettings()
: num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
- , reconnection_policy(new ExponentialReconnectionPolicy()) {}
+ , reconnection_policy(new ExponentialReconnectionPolicy())
+ , round_robin(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {}
ConnectionPoolSettings::ConnectionPoolSettings(const Config& config)
: connection_settings(config)
, num_connections_per_host(config.core_connections_per_host())
- , reconnection_policy(config.reconnection_policy()) {}
+ , reconnection_policy(config.reconnection_policy())
+ , round_robin(config.connection_pool_round_robin()) {}
class NopConnectionPoolListener : public ConnectionPoolListener {
public:
@@ -72,7 +74,8 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
, settings_(settings)
, metrics_(metrics)
, close_state_(CLOSE_STATE_OPEN)
- , notify_state_(NOTIFY_STATE_NEW) {
+ , notify_state_(NOTIFY_STATE_NEW)
+ , next_connection_idx_(0u) {
inc_ref(); // Reference for the lifetime of the pooled connections
set_pointer_keys(reconnection_schedules_);
set_pointer_keys(to_flush_);
@@ -95,13 +98,28 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
}
}
-PooledConnection::Ptr ConnectionPool::find_least_busy() const {
- PooledConnection::Vec::const_iterator it =
- std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
- if (it == connections_.end() || (*it)->is_closing()) {
+PooledConnection::Ptr ConnectionPool::find_connection() const {
+ if (!has_connections()) {
return PooledConnection::Ptr();
}
- return *it;
+
+ PooledConnection::Ptr conn_ptr;
+ if (settings_.round_robin) {
+ size_t idx = next_connection_idx_.load(std::memory_order_acquire);
+ size_t new_idx;
+ do {
+ new_idx = (idx + 1u) % connections_.size();
+ } while (!next_connection_idx_.compare_exchange_weak(idx, new_idx, std::memory_order_acq_rel));
+ conn_ptr = connections_[idx];
+ } else {
+ conn_ptr = *std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
+ }
+
+ if (conn_ptr && conn_ptr->is_closing()) {
+ conn_ptr = PooledConnection::Ptr();
+ }
+ LOG_TRACE("Chosen connection: %p", (void*)conn_ptr.get());
+ return conn_ptr;
}
bool ConnectionPool::has_connections() const { return !connections_.empty(); }
diff --git a/src/connection_pool.hpp b/src/connection_pool.hpp
index cd6a5e89c..3a5cc334b 100644
--- a/src/connection_pool.hpp
+++ b/src/connection_pool.hpp
@@ -25,6 +25,8 @@
#include
+#include
+
namespace datastax { namespace internal { namespace core {
class ConnectionPool;
@@ -94,6 +96,7 @@ struct ConnectionPoolSettings {
ConnectionSettings connection_settings;
size_t num_connections_per_host;
ReconnectionPolicy::Ptr reconnection_policy;
+ bool round_robin;
};
/**
@@ -130,12 +133,17 @@ class ConnectionPool : public RefCounted {
Metrics* metrics);
/**
- * Find the least busy connection for the pool. The least busy connection has
- * the lowest number of outstanding requests and is not closed.
+ * Find a connection in the pool. The behavior depends on the value of
+ * `ConnectionPoolSettings::round_robin`:
+ * - if it's true, then the returned connection is the next connection
+ * according to round-robin policy;
+ * - if it's false, the "least busy" connection is chosen.
+ * (The "least busy" connection has the lowest number of outstanding requests).
+ * If the connection found is closed, empty pointer is returned.
*
- * @return The least busy connection or null if no connection is available.
+ * @return The chosen connection or null if no connection is available.
*/
- PooledConnection::Ptr find_least_busy() const;
+ PooledConnection::Ptr find_connection() const;
/**
* Determine if the pool has any valid connections.
@@ -235,6 +243,9 @@ class ConnectionPool : public RefCounted {
PooledConnection::Vec connections_;
DelayedConnector::Vec pending_connections_;
DenseHashSet to_flush_;
+
+ /// Used when `ConnectionPoolSettings::round_robin` is set to true
+ mutable std::atomic next_connection_idx_;
};
}}} // namespace datastax::internal::core
diff --git a/src/connection_pool_manager.cpp b/src/connection_pool_manager.cpp
index 74c50243f..082b9aa0c 100644
--- a/src/connection_pool_manager.cpp
+++ b/src/connection_pool_manager.cpp
@@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u
}
}
-PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const {
+PooledConnection::Ptr ConnectionPoolManager::find_connection(const Address& address) const {
ConnectionPool::Map::const_iterator it = pools_.find(address);
if (it == pools_.end()) {
return PooledConnection::Ptr();
}
- return it->second->find_least_busy();
+ return it->second->find_connection();
}
bool ConnectionPoolManager::has_connections(const Address& address) const {
diff --git a/src/connection_pool_manager.hpp b/src/connection_pool_manager.hpp
index 1633e1cde..bc39788cf 100644
--- a/src/connection_pool_manager.hpp
+++ b/src/connection_pool_manager.hpp
@@ -78,13 +78,21 @@ class ConnectionPoolManager
const ConnectionPoolSettings& settings);
/**
- * Find the least busy connection for a given host.
- *
- * @param address The address of the host to find a least busy connection.
- * @return The least busy connection for a host or null if no connections are
- * available.
+ * Finds the the next connection for a given host according to round-robin policy,
+ * (when `settings_::round_robin == true`). Otherwise finds the least
+ * busy connection.
+ *
+ * @param address The address of the host to find a connection to.
+ * @return The connection selected by policy in pool's settings or
+ * null if no connections are available.
+ */
+ PooledConnection::Ptr find_connection(const Address& address) const;
+
+ /**
+ * Kept for backward compatibility (unit tests).
+ * The tests will pass only when intranode round-robin is DISABLED (default).
*/
- PooledConnection::Ptr find_least_busy(const Address& address) const;
+ PooledConnection::Ptr find_least_busy(const Address& address) const { return find_connection(address); }
/**
* Determine if a pool has any valid connections.
diff --git a/src/constants.hpp b/src/constants.hpp
index 2d2a743a7..d5229f5bf 100644
--- a/src/constants.hpp
+++ b/src/constants.hpp
@@ -142,6 +142,7 @@
#define CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS 15
#define CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS 3
#define CASS_DEFAULT_TRACING_CONSISTENCY CASS_CONSISTENCY_ONE
+#define CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN false
// Request-level defaults
#define CASS_DEFAULT_CONSISTENCY CASS_CONSISTENCY_LOCAL_ONE
diff --git a/src/request_handler.cpp b/src/request_handler.cpp
index 30b2e4d1e..cbece9fca 100644
--- a/src/request_handler.cpp
+++ b/src/request_handler.cpp
@@ -357,8 +357,7 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) {
bool is_done = false;
while (!is_done && request_execution->current_host()) {
- PooledConnection::Ptr connection =
- manager_->find_least_busy(request_execution->current_host()->address());
+ PooledConnection::Ptr connection = manager_->find_connection(request_execution->current_host()->address());
if (connection) {
int32_t result = connection->write(request_execution);
diff --git a/src/request_processor.cpp b/src/request_processor.cpp
index 47e22fbca..1d91366d4 100644
--- a/src/request_processor.cpp
+++ b/src/request_processor.cpp
@@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler
PrepareAllCallback::Ptr prepare_all_callback(
new PrepareAllCallback(address, prepare_all_handler));
- PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address));
+ PooledConnection::Ptr connection = connection_pool_manager_->find_connection(address);
if (connection) {
connection->write(prepare_all_callback.get());
}
@@ -579,8 +579,8 @@ int RequestProcessor::process_requests(uint64_t processing_time) {
bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const RequestCallback::Ptr& callback) {
- PooledConnection::Ptr connection(
- connection_pool_manager_->find_least_busy(current_host->address()));
+ PooledConnection::Ptr connection = connection_pool_manager_->find_connection(current_host->address());
+
if (connection && connection->write(callback.get()) > 0) {
// Stop the original request timer now that we have a response and
// are waiting for the maximum wait time of the handler.