Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions include/cassandra.h
Original file line number Diff line number Diff line change
Expand Up @@ -1974,6 +1974,23 @@ CASS_EXPORT CASS_DEPRECATED(CassError
cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
unsigned num_requests));

/**
* Alters the strategy of picking connections from connection pool for a given host.
* By default the connection chosen is the one with least pending requests.
* When this function is called, then usual round-robin will used.
*
* This should not be confused with load balancing while doing node selection. What is
* being load-balanced here are the <b>connections</b> to a single node.
*
* <b>Default:</b> cass_false (no round-robin)
*
* @public @memberof CassCluster
*
* @param[in] cluster
*/
CASS_EXPORT void
cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster);

/**
* Sets the timeout for connecting to a node.
*
Expand Down
4 changes: 4 additions & 0 deletions src/cluster_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ CassError cass_cluster_set_pending_requests_low_water_mark(CassCluster* cluster,
return CASS_OK;
}

void cass_cluster_set_round_robin_on_node_connections(CassCluster* cluster) {
cluster->config().set_connection_pool_round_robin(true);
}

void cass_cluster_set_connect_timeout(CassCluster* cluster, unsigned timeout_ms) {
cluster->config().set_connect_timeout(timeout_ms);
}
Expand Down
9 changes: 8 additions & 1 deletion src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class Config {
, is_client_id_set_(false)
, host_listener_(new DefaultHostListener())
, monitor_reporting_interval_secs_(CASS_DEFAULT_CLIENT_MONITOR_EVENTS_INTERVAL_SECS)
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory()) {
, cluster_metadata_resolver_factory_(new DefaultClusterMetadataResolverFactory())
, connection_pool_round_robin_(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {
profiles_.set_empty_key(String());

// Assign the defaults to the cluster profile
Expand Down Expand Up @@ -392,6 +393,11 @@ class Config {
}
}

bool connection_pool_round_robin() const { return connection_pool_round_robin_; }
void set_connection_pool_round_robin(bool new_val) {
connection_pool_round_robin_ = new_val;
}

private:
void init_profiles();

Expand Down Expand Up @@ -441,6 +447,7 @@ class Config {
unsigned monitor_reporting_interval_secs_;
CloudSecureConnectionConfig cloud_secure_connection_config_;
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory_;
bool connection_pool_round_robin_;
};

}}} // namespace datastax::internal::core
Expand Down
34 changes: 26 additions & 8 deletions src/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledC

ConnectionPoolSettings::ConnectionPoolSettings()
: num_connections_per_host(CASS_DEFAULT_NUM_CONNECTIONS_PER_HOST)
, reconnection_policy(new ExponentialReconnectionPolicy()) {}
, reconnection_policy(new ExponentialReconnectionPolicy())
, round_robin(CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN) {}

ConnectionPoolSettings::ConnectionPoolSettings(const Config& config)
: connection_settings(config)
, num_connections_per_host(config.core_connections_per_host())
, reconnection_policy(config.reconnection_policy()) {}
, reconnection_policy(config.reconnection_policy())
, round_robin(config.connection_pool_round_robin()) {}

class NopConnectionPoolListener : public ConnectionPoolListener {
public:
Expand Down Expand Up @@ -72,7 +74,8 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
, settings_(settings)
, metrics_(metrics)
, close_state_(CLOSE_STATE_OPEN)
, notify_state_(NOTIFY_STATE_NEW) {
, notify_state_(NOTIFY_STATE_NEW)
, next_connection_idx_(0u) {
inc_ref(); // Reference for the lifetime of the pooled connections
set_pointer_keys(reconnection_schedules_);
set_pointer_keys(to_flush_);
Expand All @@ -95,13 +98,28 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
}
}

PooledConnection::Ptr ConnectionPool::find_least_busy() const {
PooledConnection::Vec::const_iterator it =
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
if (it == connections_.end() || (*it)->is_closing()) {
PooledConnection::Ptr ConnectionPool::find_connection() const {
if (!has_connections()) {
return PooledConnection::Ptr();
}
return *it;

PooledConnection::Ptr conn_ptr;
if (settings_.round_robin) {
size_t idx = next_connection_idx_.load(std::memory_order_acquire);
size_t new_idx;
do {
new_idx = (idx + 1u) % connections_.size();
} while (!next_connection_idx_.compare_exchange_weak(idx, new_idx, std::memory_order_acq_rel));
conn_ptr = connections_[idx];
} else {
conn_ptr = *std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
}

if (conn_ptr && conn_ptr->is_closing()) {
conn_ptr = PooledConnection::Ptr();
}
LOG_TRACE("Chosen connection: %p", (void*)conn_ptr.get());
return conn_ptr;
}

bool ConnectionPool::has_connections() const { return !connections_.empty(); }
Expand Down
19 changes: 15 additions & 4 deletions src/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

#include <uv.h>

#include <atomic>

namespace datastax { namespace internal { namespace core {

class ConnectionPool;
Expand Down Expand Up @@ -94,6 +96,7 @@ struct ConnectionPoolSettings {
ConnectionSettings connection_settings;
size_t num_connections_per_host;
ReconnectionPolicy::Ptr reconnection_policy;
bool round_robin;
};

/**
Expand Down Expand Up @@ -130,12 +133,17 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
Metrics* metrics);

/**
* Find the least busy connection for the pool. The least busy connection has
* the lowest number of outstanding requests and is not closed.
* Find a connection in the pool. The behavior depends on the value of
* `ConnectionPoolSettings::round_robin`:
* - if it's true, then the returned connection is the next connection
* according to round-robin policy;
* - if it's false, the "least busy" connection is chosen.
* (The "least busy" connection has the lowest number of outstanding requests).
* If the connection found is closed, empty pointer is returned.
*
* @return The least busy connection or null if no connection is available.
* @return The chosen connection or null if no connection is available.
*/
PooledConnection::Ptr find_least_busy() const;
PooledConnection::Ptr find_connection() const;

/**
* Determine if the pool has any valid connections.
Expand Down Expand Up @@ -235,6 +243,9 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
PooledConnection::Vec connections_;
DelayedConnector::Vec pending_connections_;
DenseHashSet<PooledConnection*> to_flush_;

/// Used when `ConnectionPoolSettings::round_robin` is set to true
mutable std::atomic<size_t> next_connection_idx_;
};

}}} // namespace datastax::internal::core
Expand Down
4 changes: 2 additions & 2 deletions src/connection_pool_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ ConnectionPoolManager::ConnectionPoolManager(const ConnectionPool::Map& pools, u
}
}

PooledConnection::Ptr ConnectionPoolManager::find_least_busy(const Address& address) const {
PooledConnection::Ptr ConnectionPoolManager::find_connection(const Address& address) const {
ConnectionPool::Map::const_iterator it = pools_.find(address);
if (it == pools_.end()) {
return PooledConnection::Ptr();
}
return it->second->find_least_busy();
return it->second->find_connection();
}

bool ConnectionPoolManager::has_connections(const Address& address) const {
Expand Down
20 changes: 14 additions & 6 deletions src/connection_pool_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,21 @@ class ConnectionPoolManager
const ConnectionPoolSettings& settings);

/**
* Find the least busy connection for a given host.
*
* @param address The address of the host to find a least busy connection.
* @return The least busy connection for a host or null if no connections are
* available.
* Finds the the next connection for a given host according to round-robin policy,
* (when `settings_::round_robin == true`). Otherwise finds the least
* busy connection.
*
* @param address The address of the host to find a connection to.
* @return The connection selected by policy in pool's settings or
* null if no connections are available.
*/
PooledConnection::Ptr find_connection(const Address& address) const;

/**
* Kept for backward compatibility (unit tests).
* The tests will pass only when intranode round-robin is DISABLED (default).
*/
PooledConnection::Ptr find_least_busy(const Address& address) const;
PooledConnection::Ptr find_least_busy(const Address& address) const { return find_connection(address); }

/**
* Determine if a pool has any valid connections.
Expand Down
1 change: 1 addition & 0 deletions src/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
#define CASS_DEFAULT_MAX_TRACING_DATA_WAIT_TIME_MS 15
#define CASS_DEFAULT_RETRY_TRACING_DATA_WAIT_TIME_MS 3
#define CASS_DEFAULT_TRACING_CONSISTENCY CASS_CONSISTENCY_ONE
#define CASS_DEFAULT_CONNECTION_POOL_ROUND_ROBIN false

// Request-level defaults
#define CASS_DEFAULT_CONSISTENCY CASS_CONSISTENCY_LOCAL_ONE
Expand Down
3 changes: 1 addition & 2 deletions src/request_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,7 @@ void RequestHandler::internal_retry(RequestExecution* request_execution) {

bool is_done = false;
while (!is_done && request_execution->current_host()) {
PooledConnection::Ptr connection =
manager_->find_least_busy(request_execution->current_host()->address());
PooledConnection::Ptr connection = manager_->find_connection(request_execution->current_host()->address());
if (connection) {
int32_t result = connection->write(request_execution);

Expand Down
6 changes: 3 additions & 3 deletions src/request_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ bool RequestProcessor::on_prepare_all(const RequestHandler::Ptr& request_handler
PrepareAllCallback::Ptr prepare_all_callback(
new PrepareAllCallback(address, prepare_all_handler));

PooledConnection::Ptr connection(connection_pool_manager_->find_least_busy(address));
PooledConnection::Ptr connection = connection_pool_manager_->find_connection(address);
if (connection) {
connection->write(prepare_all_callback.get());
}
Expand Down Expand Up @@ -579,8 +579,8 @@ int RequestProcessor::process_requests(uint64_t processing_time) {
bool RequestProcessor::write_wait_callback(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const RequestCallback::Ptr& callback) {
PooledConnection::Ptr connection(
connection_pool_manager_->find_least_busy(current_host->address()));
PooledConnection::Ptr connection = connection_pool_manager_->find_connection(current_host->address());

if (connection && connection->write(callback.get()) > 0) {
// Stop the original request timer now that we have a response and
// are waiting for the maximum wait time of the handler.
Expand Down