Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
added configurable schema agreement settings for issue hector-client#302
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Oct 19, 2011
1 parent 80056e5 commit 7698127
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 7 deletions.
Expand Up @@ -48,8 +48,9 @@ public abstract class AbstractCluster implements Cluster {
/**
* Linked to Cassandra StorageProxy.
*/
private static final int RING_DELAY = 30 * 1000; // delay after which we assume ring has stablized

public static final int RING_DELAY_DEF = 30 * 1000; // delay after which we assume ring has stablized
public static final int WAIT_FOR_SCHEMA_AGREEMENT_SLEEP_TIME = 1000;

protected final HConnectionManager connectionManager;
private final String name;
private final CassandraHostConfigurator configurator;
Expand All @@ -60,6 +61,9 @@ public abstract class AbstractCluster implements Cluster {
private Set<CassandraHost> knownPoolHosts;
protected final ExceptionsTranslator xtrans;
private final Map<String, String> credentials;
private final int ringDelay;
private final int schemaAgreementSleepTime;


public AbstractCluster(String clusterName, CassandraHostConfigurator cassandraHostConfigurator) {
this(clusterName, cassandraHostConfigurator, EMPTY_CREDENTIALS);
Expand All @@ -74,6 +78,8 @@ public AbstractCluster(String clusterName, CassandraHostConfigurator cassandraHo
xtrans = new ExceptionsTranslatorImpl();
clockResolution = cassandraHostConfigurator.getClockResolution();
this.credentials = Collections.unmodifiableMap(credentials);
ringDelay = cassandraHostConfigurator.getRingDelay();
schemaAgreementSleepTime = cassandraHostConfigurator.getSchemaAgreementSleepTime();
}

@Override
Expand Down Expand Up @@ -303,7 +309,7 @@ public Void execute(Cassandra.Client cassandra) throws HectorException {
}


protected static void waitForSchemaAgreement(Cassandra.Client cassandra) throws InvalidRequestException, TException, InterruptedException {
protected void waitForSchemaAgreement(Cassandra.Client cassandra) throws InvalidRequestException, TException, InterruptedException {
int waited = 0;
int versions = 0;
while (versions != 1) {
Expand All @@ -315,10 +321,10 @@ protected static void waitForSchemaAgreement(Cassandra.Client cassandra) throws
}

if (versions != 1) {
Thread.sleep(1000);
waited += 1000;
if (waited > RING_DELAY)
throw new RuntimeException("Could not reach schema agreement in " + RING_DELAY + "ms");
Thread.sleep(schemaAgreementSleepTime);
waited += schemaAgreementSleepTime;
if (waited > ringDelay)
throw new RuntimeException("Could not reach schema agreement in " + ringDelay + "ms");
}
}
}
Expand Down
Expand Up @@ -44,6 +44,8 @@ public final class CassandraHostConfigurator implements Serializable {
private boolean runAutoDiscoveryAtStartup = false;
private boolean useSocketKeepalive = false;
private HOpTimer opTimer = new NullOpTimer();
private int ringDelay = AbstractCluster.RING_DELAY_DEF;
private int schemaAgreementSleepTime = AbstractCluster.WAIT_FOR_SCHEMA_AGREEMENT_SLEEP_TIME;

public CassandraHostConfigurator() {
this.hosts = null;
Expand Down Expand Up @@ -316,6 +318,30 @@ public void setUseSocketKeepalive(boolean useSocketKeepalive) {
this.useSocketKeepalive = useSocketKeepalive;
}

/**
* Delay in ms for which we will assume the ring has stabilized after a schema modification
* @return
*/
public int getRingDelay() {
return ringDelay;
}

public void setRingDelay(int ringDelay) {
this.ringDelay = ringDelay;
}

/**
* The amount of time in ms to wait for schema agreements
* @return
*/
public int getSchemaAgreementSleepTime() {
return schemaAgreementSleepTime;
}

public void setSchemaAgreementSleepTime(int schemaAgreementSleepTime) {
this.schemaAgreementSleepTime = schemaAgreementSleepTime;
}



}

0 comments on commit 7698127

Please sign in to comment.