Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
fix for issue hector-client#223, cassandra-all should be a test depen…
Browse files Browse the repository at this point in the history
…dency
  • Loading branch information
sbridges committed Aug 12, 2011
1 parent 7c57f0d commit 1406c96
Show file tree
Hide file tree
Showing 19 changed files with 767 additions and 66 deletions.
12 changes: 7 additions & 5 deletions core/pom.xml
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>me.prettyprint</groupId>
<artifactId>hector</artifactId>
<version>0.8.0-2-SNAPSHOT</version>
<version>0.8.0-3-SNAPSHOT</version>
</parent>
<artifactId>hector-core</artifactId>
<packaging>bundle</packaging>
Expand Down Expand Up @@ -102,10 +102,6 @@
<artifactId>commons-pool</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-thrift</artifactId>
Expand All @@ -114,6 +110,12 @@
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</dependency>
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-test</artifactId>
<version>${pom.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand Down
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.utils.DaemonThreadPoolFactory;

public abstract class BackgroundCassandraHostService {

Expand All @@ -20,7 +21,7 @@ public abstract class BackgroundCassandraHostService {

public BackgroundCassandraHostService(HConnectionManager connectionManager,
CassandraHostConfigurator cassandraHostConfigurator) {
executor = Executors.newScheduledThreadPool(1, new DaemonThreadPoolFactory("Hector." + getClass().getSimpleName() + "Thread"));
executor = Executors.newScheduledThreadPool(1, new DaemonThreadPoolFactory(getClass()));
this.connectionManager = connectionManager;
this.cassandraHostConfigurator = cassandraHostConfigurator;

Expand All @@ -40,24 +41,7 @@ public void setRetryDelayInSeconds(int retryDelayInSeconds) {
this.retryDelayInSeconds = retryDelayInSeconds;
}

private static class DaemonThreadPoolFactory implements ThreadFactory {

private static final AtomicInteger count = new AtomicInteger();
private final String name;

public DaemonThreadPoolFactory(String name) {
this.name = name;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName(name + "-" + count.incrementAndGet());
return t;
}

}


}

Expand Up @@ -8,11 +8,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.utils.DaemonThreadPoolFactory;

import org.apache.cassandra.concurrent.RetryingScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -29,7 +31,8 @@ public class DynamicLoadBalancingPolicy implements LoadBalancingPolicy {

private static final long serialVersionUID = -1044985880174118325L;
private static final Logger log = LoggerFactory.getLogger(DynamicLoadBalancingPolicy.class);
public static RetryingScheduledThreadPoolExecutor tasks = new RetryingScheduledThreadPoolExecutor("BGTasks");

private final ScheduledExecutorService tasks = new ScheduledThreadPoolExecutor(1, new DaemonThreadPoolFactory(getClass()));

// references which is used to make the real time requests faster.
private Map<HClientPool, Double> scores = Maps.newConcurrentMap();
Expand All @@ -45,15 +48,23 @@ public DynamicLoadBalancingPolicy() {
// Pre-calculate the scores so as we can compare it fast.
Runnable updateThread = new Runnable() {
public void run() {
updateScores();
try {
updateScores();
} catch(Exception e) {
log.info("exception updating scores", e);
}
}
};

// Clear Stats.
Runnable resetThread = new Runnable() {
public void run() {
for (LatencyAwareHClientPool pool : allPools) {
pool.clear();
try {
for (LatencyAwareHClientPool pool : allPools) {
pool.clear();
}
} catch(Exception e) {
log.info("exceotuib reseting stats", e);
}
}
};
Expand Down
@@ -0,0 +1,32 @@
package me.prettyprint.cassandra.constants;

/**
* Defaults
*/
public class CFMetaDataDefaults {
public final static double DEFAULT_ROW_CACHE_SIZE = 0.0;
public final static double DEFAULT_KEY_CACHE_SIZE = 200000;
public final static double DEFAULT_READ_REPAIR_CHANCE = 1.0;
public final static boolean DEFAULT_REPLICATE_ON_WRITE = true;
public final static int DEFAULT_SYSTEM_MEMTABLE_THROUGHPUT_IN_MB = 8;
public final static int DEFAULT_ROW_CACHE_SAVE_PERIOD_IN_SECONDS = 0;
public final static int DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS = 4 * 3600;
public final static int DEFAULT_GC_GRACE_SECONDS = 864000;
public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4;
public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32;
public final static int DEFAULT_MEMTABLE_LIFETIME_IN_MINS = 60 * 24;
public final static double DEFAULT_MERGE_SHARDS_CHANCE = 0.1;
//this defaults to ram / 16 / 1MB on the server
//but we are on the client, so we don't know how much
//ram is on the server, assume a conservative 8G
public final static int DEFAULT_MEMTABLE_THROUGHPUT_IN_MB = 500;
public final static double DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS =
sizeMemtableOperations(DEFAULT_MEMTABLE_THROUGHPUT_IN_MB);


private static double sizeMemtableOperations(int mem_throughput)
{
return 0.3 * mem_throughput / 64.0;
}

}
Expand Up @@ -10,7 +10,7 @@
import me.prettyprint.hector.api.ddl.ColumnType;
import me.prettyprint.hector.api.ddl.ComparatorType;

import org.apache.cassandra.config.CFMetaData;
import me.prettyprint.cassandra.constants.CFMetaDataDefaults;
import org.apache.cassandra.thrift.CfDef;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
Expand Down Expand Up @@ -59,15 +59,15 @@ public ThriftCfDef(CfDef d) {
defaultValidationClass = d.default_validation_class;
id = d.id;
minCompactionThreshold = d.min_compaction_threshold == 0 ?
CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD : d.min_compaction_threshold;
CFMetaDataDefaults.DEFAULT_MIN_COMPACTION_THRESHOLD : d.min_compaction_threshold;
maxCompactionThreshold = d.max_compaction_threshold == 0 ?
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD : d.max_compaction_threshold;
CFMetaDataDefaults.DEFAULT_MAX_COMPACTION_THRESHOLD : d.max_compaction_threshold;
memtableOperationsInMillions = d.memtable_operations_in_millions == 0 ?
CFMetaData.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS : d.memtable_operations_in_millions;
CFMetaDataDefaults.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS : d.memtable_operations_in_millions;
memtableFlushAfterMins = d.memtable_flush_after_mins == 0 ?
CFMetaData.DEFAULT_MEMTABLE_LIFETIME_IN_MINS : d.memtable_flush_after_mins;
CFMetaDataDefaults.DEFAULT_MEMTABLE_LIFETIME_IN_MINS : d.memtable_flush_after_mins;
memtableThroughputInMb = d.memtable_throughput_in_mb == 0 ?
CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB : d.memtable_throughput_in_mb;
CFMetaDataDefaults.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB : d.memtable_throughput_in_mb;

replicateOnWrite = d.replicate_on_write;
}
Expand All @@ -90,15 +90,15 @@ public ThriftCfDef(ColumnFamilyDefinition columnFamilyDefinition) {
defaultValidationClass = columnFamilyDefinition.getDefaultValidationClass();
id = columnFamilyDefinition.getId();
minCompactionThreshold = columnFamilyDefinition.getMinCompactionThreshold() == 0 ?
CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD : columnFamilyDefinition.getMinCompactionThreshold();
CFMetaDataDefaults.DEFAULT_MIN_COMPACTION_THRESHOLD : columnFamilyDefinition.getMinCompactionThreshold();
maxCompactionThreshold = columnFamilyDefinition.getMaxCompactionThreshold() == 0 ?
CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD : columnFamilyDefinition.getMaxCompactionThreshold();
CFMetaDataDefaults.DEFAULT_MAX_COMPACTION_THRESHOLD : columnFamilyDefinition.getMaxCompactionThreshold();
memtableFlushAfterMins = columnFamilyDefinition.getMemtableFlushAfterMins() == 0 ?
CFMetaData.DEFAULT_MEMTABLE_LIFETIME_IN_MINS : columnFamilyDefinition.getMemtableFlushAfterMins();
CFMetaDataDefaults.DEFAULT_MEMTABLE_LIFETIME_IN_MINS : columnFamilyDefinition.getMemtableFlushAfterMins();
memtableThroughputInMb = columnFamilyDefinition.getMemtableThroughputInMb() == 0 ?
CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB : columnFamilyDefinition.getMemtableThroughputInMb();
CFMetaDataDefaults.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB : columnFamilyDefinition.getMemtableThroughputInMb();
memtableOperationsInMillions = columnFamilyDefinition.getMemtableOperationsInMillions() == 0 ?
CFMetaData.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS : columnFamilyDefinition.getMemtableOperationsInMillions();
CFMetaDataDefaults.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS : columnFamilyDefinition.getMemtableOperationsInMillions();
replicateOnWrite = columnFamilyDefinition.isReplicateOnWrite();
}

Expand All @@ -109,16 +109,16 @@ public ThriftCfDef(String keyspace, String columnFamilyName) {

columnType = ColumnType.STANDARD;
comparatorType = ComparatorType.BYTESTYPE;
readRepairChance = CFMetaData.DEFAULT_READ_REPAIR_CHANCE;
keyCacheSize = CFMetaData.DEFAULT_KEY_CACHE_SIZE;
keyCacheSavePeriodInSeconds = CFMetaData.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS;
gcGraceSeconds = CFMetaData.DEFAULT_GC_GRACE_SECONDS;
minCompactionThreshold = CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD;
maxCompactionThreshold = CFMetaData.DEFAULT_MAX_COMPACTION_THRESHOLD;
memtableFlushAfterMins = CFMetaData.DEFAULT_MEMTABLE_LIFETIME_IN_MINS;
memtableThroughputInMb = CFMetaData.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB;
memtableOperationsInMillions = CFMetaData.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS;
replicateOnWrite = CFMetaData.DEFAULT_REPLICATE_ON_WRITE;
readRepairChance = CFMetaDataDefaults.DEFAULT_READ_REPAIR_CHANCE;
keyCacheSize = CFMetaDataDefaults.DEFAULT_KEY_CACHE_SIZE;
keyCacheSavePeriodInSeconds = CFMetaDataDefaults.DEFAULT_KEY_CACHE_SAVE_PERIOD_IN_SECONDS;
gcGraceSeconds = CFMetaDataDefaults.DEFAULT_GC_GRACE_SECONDS;
minCompactionThreshold = CFMetaDataDefaults.DEFAULT_MIN_COMPACTION_THRESHOLD;
maxCompactionThreshold = CFMetaDataDefaults.DEFAULT_MAX_COMPACTION_THRESHOLD;
memtableFlushAfterMins = CFMetaDataDefaults.DEFAULT_MEMTABLE_LIFETIME_IN_MINS;
memtableThroughputInMb = CFMetaDataDefaults.DEFAULT_MEMTABLE_THROUGHPUT_IN_MB;
memtableOperationsInMillions = CFMetaDataDefaults.DEFAULT_MEMTABLE_OPERATIONS_IN_MILLIONS;
replicateOnWrite = CFMetaDataDefaults.DEFAULT_REPLICATE_ON_WRITE;
}

public ThriftCfDef(String keyspace, String columnFamilyName, ComparatorType comparatorType) {
Expand Down
@@ -0,0 +1,33 @@
package me.prettyprint.cassandra.utils;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadPoolFactory implements ThreadFactory {

private ConcurrentHashMap<String, AtomicInteger> counters =
new ConcurrentHashMap<String, AtomicInteger>();

private final String name;

public DaemonThreadPoolFactory(Class<?> parentClass) {
this.name = "Hector." + parentClass.getName();
}

private int getNextThreadNumber() {
if(!counters.containsKey(name)) {
counters.putIfAbsent(name, new AtomicInteger());
}
return counters.get(name).incrementAndGet();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName(name + "-" + getNextThreadNumber());
return t;
}

}
Expand Up @@ -30,10 +30,10 @@
import me.prettyprint.cassandra.utils.ByteBufferOutputStream;
import me.prettyprint.hector.api.Serializer;

import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableClassToInstanceMap;
Expand Down Expand Up @@ -370,7 +370,8 @@ private String getComparator(int i, ByteBuffer bb) {
try {
int header = getShortLength(bb);
if ((header & 0x8000) == 0) {
name = ByteBufferUtil.string(getBytes(bb, header));

name = Charsets.UTF_8.newDecoder().decode(getBytes(bb, header).duplicate()).toString();
} else {
byte a = (byte) (header & 0xFF);
name = aliasToComparatorMapping.get(a);
Expand Down Expand Up @@ -667,7 +668,7 @@ public ByteBuffer serialize() {
out.writeShort((short) (0x8000 | a));
} else {
out.writeShort((short) comparator.length());
out.write(ByteBufferUtil.bytes(comparator));
out.write(ByteBuffer.wrap(comparator.getBytes(Charsets.UTF_8)));
}
// if (comparator.equals(BYTESTYPE.getTypeName()) && (cb.remaining() ==
// 0)) {
Expand Down
Expand Up @@ -4,7 +4,7 @@

import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.service.CassandraHostConfigurator;
import me.prettyprint.cassandra.testutils.EmbeddedServerHelper;
import me.prettyprint.hector.testutils.EmbeddedServerHelper;

import org.apache.cassandra.config.ConfigurationException;
import org.apache.thrift.transport.TTransportException;
Expand Down
Expand Up @@ -11,9 +11,9 @@
import java.util.Map;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.testutils.EmbeddedServerHelper;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.testutils.EmbeddedServerHelper;

import org.apache.cassandra.config.ConfigurationException;
import org.apache.thrift.transport.TTransportException;
Expand Down
Expand Up @@ -13,6 +13,7 @@
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesCounterQuery;
import me.prettyprint.hector.api.query.RangeSlicesQuery;

import org.junit.Before;
import org.junit.Test;

Expand Down
Expand Up @@ -22,12 +22,12 @@
import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.model.QuorumAllConsistencyLevelPolicy;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.testutils.EmbeddedServerHelper;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.exceptions.HNotFoundException;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.testutils.EmbeddedServerHelper;

import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.io.util.FileUtils;
Expand Down
Expand Up @@ -6,7 +6,6 @@
import java.util.Date;
import java.util.UUID;

import me.prettyprint.cassandra.BaseEmbededServerSetupTest;
import me.prettyprint.cassandra.service.clock.MicrosecondsClockResolution;
import me.prettyprint.cassandra.service.clock.MicrosecondsSyncClockResolution;
import me.prettyprint.hector.api.ClockResolution;
Expand Down
11 changes: 8 additions & 3 deletions object-mapper/pom.xml
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>me.prettyprint</groupId>
<artifactId>hector</artifactId>
<version>0.8.0-2-SNAPSHOT</version>
<version>0.8.0-3-SNAPSHOT</version>
</parent>
<artifactId>hector-object-mapper</artifactId>
<name>hector-object-mapper</name>
Expand Down Expand Up @@ -40,7 +40,7 @@
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-core</artifactId>
<version>0.8.0-2-SNAPSHOT</version>
<version>0.8.0-3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -51,8 +51,13 @@
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector-test</artifactId>
<version>0.8.0-3-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
Expand Down

0 comments on commit 1406c96

Please sign in to comment.