Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Add TopologyAware to AutoDiscovery service
Browse files Browse the repository at this point in the history
  • Loading branch information
patricioe committed Oct 26, 2011
1 parent b3826b3 commit 820eefa
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 20 deletions.
Expand Up @@ -8,16 +8,12 @@
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.ThriftCluster;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;

import org.apache.cassandra.thrift.AuthenticationRequest;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,12 +24,15 @@ public class NodeAutoDiscoverService extends BackgroundCassandraHostService {

public static final int DEF_AUTO_DISCOVERY_DELAY = 30;

private DataCenterValidator dataCenterValidator;


public NodeAutoDiscoverService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
this.retryDelayInSeconds = cassandraHostConfigurator.getAutoDiscoveryDelayInSeconds();
sf = executor.scheduleWithFixedDelay(new QueryRing(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS);
this.dataCenterValidator = new DataCenterValidator(cassandraHostConfigurator.getAutoDiscoveryDataCenters());
sf = executor.scheduleWithFixedDelay(new QueryRing(), retryDelayInSeconds, retryDelayInSeconds, TimeUnit.SECONDS);
}

@Override
Expand Down Expand Up @@ -84,38 +83,69 @@ public Set<CassandraHost> discoverNodes() {
Set<CassandraHost> existingHosts = connectionManager.getHosts();
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>();

log.info("using existing hosts {}", existingHosts);
if (log.isDebugEnabled()) {
log.debug("using existing hosts {}", existingHosts);
}

try {

String clusterName = connectionManager.getClusterName();

//this could be suspect, but we need this
ThriftCluster cluster = (ThriftCluster) HFactory.getCluster(clusterName);

for(KeyspaceDefinition keyspaceDefinition: cluster.describeKeyspaces()) {
for(KeyspaceDefinition keyspaceDefinition: cluster.describeKeyspaces()) {
if (!keyspaceDefinition.getName().equals(Keyspace.KEYSPACE_SYSTEM)) {
List<TokenRange> tokenRanges = cluster.describeRing(keyspaceDefinition.getName());
for (TokenRange tokenRange : tokenRanges) {
for (String host : tokenRange.getEndpoints()) {
CassandraHost foundHost = new CassandraHost(host,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);

for (EndpointDetails endPointDetail : tokenRange.getEndpoint_details()) {
// Check if we are allowed to include this Data Center.
if (dataCenterValidator.validate(endPointDetail.getDatacenter())) {
// Maybe add this host if it's a new host.
CassandraHost foundHost = new CassandraHost(endPointDetail.getHost(), cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);
}
}
}
}

}
break;
}
}
} catch (Exception e) {
log.error("Discovery Service failed attempt to connect CassandraHost", e);
}
// } finally {
// connectionManager.releaseClient(thriftClient);
// }

return foundHosts;
}


/**
* Abstraction to validate that the discovered nodes belong to a specific datacenters.
*
* @author patricioe (Patricio Echague - patricio@datastax.com)
*
*/
class DataCenterValidator {

Set<String> dataCenters;

public DataCenterValidator(List<String> dataCenters) {
if (dataCenters != null)
this.dataCenters = new HashSet<String>(dataCenters);
}

public boolean validate(String dcName) {
// If the DC is not defined (i.e: single cluster) always returns TRUE.
if (dataCenters == null || dcName == null)
return true;

return dataCenters.contains(dcName);
}
}

}

@@ -1,6 +1,10 @@
package me.prettyprint.cassandra.service;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;

import com.google.common.collect.Lists;

import me.prettyprint.cassandra.connection.CassandraHostRetryService;
import me.prettyprint.cassandra.connection.HOpTimer;
Expand All @@ -26,11 +30,15 @@ public final class CassandraHostConfigurator implements Serializable {
private ExhaustedPolicy exhaustedPolicy;
private ClockResolution clockResolution = DEF_CLOCK_RESOLUTION;
private boolean useThriftFramedTransport = CassandraHost.DEFAULT_USE_FRAMED_THRIFT_TRANSPORT;

private boolean retryDownedHosts = true;
private boolean autoDiscoverHosts = false;
private int retryDownedHostsQueueSize = CassandraHostRetryService.DEF_QUEUE_SIZE;
private int retryDownedHostsDelayInSeconds = CassandraHostRetryService.DEF_RETRY_DELAY;

private boolean autoDiscoverHosts = false;
private int autoDiscoveryDelayInSeconds = NodeAutoDiscoverService.DEF_AUTO_DISCOVERY_DELAY;
private List<String> autoDiscoveryDataCenters;

private LoadBalancingPolicy loadBalancingPolicy = new RoundRobinBalancingPolicy();
public static final ClockResolution DEF_CLOCK_RESOLUTION = HFactory.createClockResolution(ClockResolution.MICROSECONDS_SYNC);
private int hostTimeoutCounter = HostTimeoutTracker.DEF_TIMEOUT_COUNTER;
Expand All @@ -42,6 +50,7 @@ public final class CassandraHostConfigurator implements Serializable {
private boolean useSocketKeepalive = false;
private HOpTimer opTimer = new NullOpTimer();


public CassandraHostConfigurator() {
this.hosts = null;
}
Expand Down Expand Up @@ -215,6 +224,32 @@ public void setAutoDiscoveryDelayInSeconds(int autoDiscoveryDelayInSeconds) {
this.autoDiscoveryDelayInSeconds = autoDiscoveryDelayInSeconds;
}

/**
* Sets the local datacenter for the DiscoveryService. Nodes out of this
* datacenter will be discarded.
* @param dataCenter DataCenter name
*/
public void setAutoDiscoveryDataCenter(String dataCenter) {
this.autoDiscoveryDataCenters = Arrays.asList(dataCenter);
}

/**
* Sets the datacenters for the DiscoveryService. Nodes out of these
* datacenters will be discarded.
*/
public void setAutoDiscoveryDataCenter(List<String> dataCenters) {
this.autoDiscoveryDataCenters = dataCenters;
}

/**
* Retrieves the 'local' datacenter names that the DiscoveryService recognizes as valid
* in order to discover new hosts.
* @return a list of 'local' datacenter names
*/
public List<String> getAutoDiscoveryDataCenters() {
return autoDiscoveryDataCenters;
}

public LoadBalancingPolicy getLoadBalancingPolicy() {
return loadBalancingPolicy;
}
Expand Down

0 comments on commit 820eefa

Please sign in to comment.