Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Fixed issue with highscalelib concept of equals/hashcode and cassandr…
Browse files Browse the repository at this point in the history
…ahost
  • Loading branch information
zznate committed Nov 14, 2010
1 parent 4991fb8 commit 20657ef
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
Expand Up @@ -64,7 +64,7 @@ public HConnectionManager(CassandraHostConfigurator cassandraHostConfigurator) {
}

public void addCassandraHost(CassandraHost cassandraHost) {
if ( !hostPools.containsKey(cassandraHost) ) {
if ( !getHosts().contains(cassandraHost) ) {
hostPools.put(cassandraHost, new ConcurrentHClientPool(cassandraHost));
log.info("Added host {} to pool", cassandraHost.getName());
} else {
Expand All @@ -73,7 +73,7 @@ public void addCassandraHost(CassandraHost cassandraHost) {
}

public Set<CassandraHost> getHosts() {
return Collections.unmodifiableSet(hostPools.keySet());
return Collections.unmodifiableSet(new HashSet<CassandraHost>(hostPools.keySet()));
}


Expand Down
Expand Up @@ -6,7 +6,9 @@
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.Cassandra.Client;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
Expand All @@ -18,6 +20,10 @@
import me.prettyprint.cassandra.connection.CassandraHostRetryService.RetryRunner;
import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.exceptions.HectorException;


public class NodeAutoDiscoverService extends BackgroundCassandraHostService {
Expand All @@ -27,10 +33,12 @@ public class NodeAutoDiscoverService extends BackgroundCassandraHostService {
private CassandraHost cassandraHost;
public static final int DEF_AUTO_DISCOVERY_DELAY = 30;


public NodeAutoDiscoverService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
super(connectionManager, cassandraHostConfigurator);
sf = executor.scheduleWithFixedDelay(new QueryRing(), this.retryDelayInSeconds,this.retryDelayInSeconds, TimeUnit.SECONDS);
super(connectionManager, cassandraHostConfigurator);
this.retryDelayInSeconds = cassandraHostConfigurator.getAutoDiscoveryDelayInSeconds();
sf = executor.scheduleWithFixedDelay(new QueryRing(), retryDelayInSeconds,retryDelayInSeconds, TimeUnit.SECONDS);
}

void shutdown() {
Expand Down Expand Up @@ -73,30 +81,32 @@ public Set<CassandraHost> discoverNodes() {
Set<CassandraHost> foundHosts = new HashSet<CassandraHost>();

HThriftClient thriftClient = null;
log.info("using existing hosts {}", existingHosts);
try {
thriftClient = connectionManager.borrowClient();
List<TokenRange> tokens = thriftClient.getCassandra().describe_ring("System");
for (TokenRange tokenRange : tokens) {
if ( log.isDebugEnabled() ) {
log.debug("Looking over TokenRange {} for new hosts", tokenRange);
}
List<String> endpoints = tokenRange.getEndpoints();
for (String endpoint : endpoints) {
CassandraHost foundHost = new CassandraHost(endpoint,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);

for (KsDef keyspace : thriftClient.getCassandra().describe_keyspaces()) {
if (!keyspace.getName().equals(Keyspace.KEYSPACE_SYSTEM)) {
List<TokenRange> tokenRanges = thriftClient.getCassandra().describe_ring(keyspace.getName());
for (TokenRange tokenRange : tokenRanges) {
for (String host : tokenRange.getEndpoints()) {
CassandraHost foundHost = new CassandraHost(host,cassandraHostConfigurator.getPort());
if ( !existingHosts.contains(foundHost) ) {
log.info("Found a node we don't know about {} for TokenRange {}", foundHost, tokenRange);
foundHosts.add(foundHost);
}
}
}
break;
}

}
}
} catch (Exception e) {
//log.error("Downed Host retry failed attempt to verify CassandraHost", e);
log.error("Downed Host retry failed attempt to verify CassandraHost", e);
} finally {
connectionManager.releaseClient(thriftClient);
}
return foundHosts;
}

}

0 comments on commit 20657ef

Please sign in to comment.