Permalink
Browse files

added in bucketing counter spread operation

  • Loading branch information...
1 parent f4648c6 commit 5a5d3513bf1df982a487718933a9bbffca54dffc zznate committed Jan 23, 2012
View
148 src/main/java/com/riptano/cassandra/stress/BucketingCounterSpreadCommand.java
@@ -0,0 +1,148 @@
+package com.riptano.cassandra.stress;
+
+import java.util.TimeZone;
+
+import me.prettyprint.cassandra.model.HCounterColumnImpl;
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.cassandra.service.BatchSizeHint;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import org.apache.commons.lang.time.FastDateFormat;
+
+/**
+ * Testing a scenario where HH *might* cause counter replication to loop
+ * on moderately sized clusters
+ *
+ * @author zznate
+ */
+public class BucketingCounterSpreadCommand extends StressCommand {
+
+ private static final long MINS_IN_YEAR = 525600L;
+ private static final long MINS_IN_MONTH = 43800L;
+ // more rounded version from 60 mins/hour * 7 days *
+ private static final long MINS_IN_WEEK = 10080L;
+ private static final long MINS_IN_DAY = 1440L;
+ private static final long MINS_IN_HOUR = 60L;
+
+ public BucketingCounterSpreadCommand(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) {
+ super(startKey, commandArgs, commandRunner);
+ }
+
+ @Override
+ public Void call() throws Exception {
+
+ Mutator<String> counterMutator =
+ HFactory.createMutator(commandArgs.keyspace, StringSerializer.get(), new BatchSizeHint(500,2));
+ int x=0;
+ for(; x < MINS_IN_YEAR; x++) {
+ // build all bucket - 1 row
+ new CounterColumnBuilder(BucketType.ALL,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // 12 rows
+ new CounterColumnBuilder(BucketType.MONTH,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // 52 rows
+ new CounterColumnBuilder(BucketType.WEEK,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // 365 rows
+ new CounterColumnBuilder(BucketType.DAY,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // 8760 rows
+ new CounterColumnBuilder(BucketType.HOUR,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // 525600 rows
+ new CounterColumnBuilder(BucketType.MINUTE,x)
+ .applyClicks(1)
+ .applyView(1)
+ .addToMutation(counterMutator);
+
+ // TODO test≈ auto-batching here
+ if ( x % 500 == 0) {
+ executeMutator(counterMutator, x);
+ }
+ }
+ executeMutator(counterMutator,x);
+
+
+ return null;
+ }
+
+
+ static class CounterColumnBuilder {
+ private HCounterColumn<String> clicksCounter;
+ private HCounterColumn<String> viewsCounter;
+ private final String keyString;
+
+ CounterColumnBuilder(BucketType bucketType, long minInYear) {
+ this.keyString = bucketType.formatDate(minInYear * 60 * 1000);
+ }
+
+ CounterColumnBuilder applyClicks(long clicks) {
+ this.clicksCounter = new HCounterColumnImpl<String>("clicks",clicks,StringSerializer.get());
+ return this;
+ }
+
+ CounterColumnBuilder applyView(long views) {
+ this.viewsCounter = new HCounterColumnImpl<String>("views",views,StringSerializer.get());
+ return this;
+ }
+
+ void addToMutation(Mutator<String> mutator) {
+ mutator.addCounter(keyString,"CounterCf",clicksCounter);
+ mutator.addCounter(keyString,"CounterCf",viewsCounter);
+ }
+ }
+
+ /**
+ * Enum using commons-lang FastDateFormat. Parses a long key to a bucket value
+ * based on such
+ */
+ enum BucketType {
+ ALL("__ALL__"),
+ MONTH("YYYY_MM"),
+ WEEK("YYYY_MM_w"),
+ DAY("YYYY_MM_dd"),
+ HOUR("YYYY_MM_dd_hh"),
+ MINUTE("YYYY_MM_dd_hh_mm");
+
+ final FastDateFormat formatter;
+
+ BucketType(String format) {
+ this.formatter = FastDateFormat.getInstance(format, TimeZone.getTimeZone("GMT"));
+ }
+
+ /**
+ * Return the formatterd date based on the type. In the case of __ALL__
+ * we just return the format string, ignoring the date
+ * @param date
+ * @return
+ */
+ public String formatDate(long date) {
+ if (this == BucketType.ALL ) {
+ return toString();
+ }
+ return formatter.format(date);
+ }
+
+ @Override
+ public String toString() {
+ return formatter.getPattern();
+ }
+ }
+}
View
4 src/main/java/com/riptano/cassandra/stress/CommandRunner.java
@@ -71,7 +71,7 @@ private StressCommand getCommandInstance(int startKeyArg, CommandArgs commandArg
int startKey = commandArgs.startKey + startKeyArg;
if ( log.isDebugEnabled() ) {
- log.debug("Command requested with starting key pos {}", startKey);
+ log.debug("Command requested with starting key pos {} and op {}", startKey, commandArgs.getOperation());
}
Operation operation = commandArgs.getOperation();
@@ -89,6 +89,8 @@ private StressCommand getCommandInstance(int startKeyArg, CommandArgs commandArg
return new MultigetSliceCommand(startKey, commandArgs, commandRunner);
case VERIFY_LAST_INSERT:
return new VerifyLastInsertCommand(startKey, commandArgs, commandRunner);
+ case COUNTER_SPREAD:
+ return new BucketingCounterSpreadCommand(startKey, commandArgs, commandRunner);
};
return new InsertCommand(startKey, commandArgs, commandRunner);
}
View
25 src/main/java/com/riptano/cassandra/stress/InsertCommand.java
@@ -1,13 +1,8 @@
package com.riptano.cassandra.stress;
-import java.util.concurrent.CountDownLatch;
-
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;
-
-import org.apache.cassandra.utils.LatencyTracker;
import org.apache.commons.lang.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +38,7 @@ public Void call() throws Exception {
mutator.addInsertion(key, commandArgs.workingColumnFamily, HFactory.createStringColumn(String.format(COLUMN_NAME_FORMAT, j2),
String.format(COLUMN_VAL_FORMAT, j2, RandomStringUtils.random(colWidth))));
if ( j2 > 0 && j2 % commandArgs.batchSize == 0 ) {
- executeMutator(rows);
+ executeMutator(mutator, rows);
}
}
@@ -52,7 +47,7 @@ public Void call() throws Exception {
}
}
- executeMutator(rows);
+ executeMutator(mutator,rows);
}
commandRunner.doneSignal.countDown();
log.info("Last key was: {} for thread {}", key, Thread.currentThread().getId());
@@ -66,23 +61,7 @@ public Void call() throws Exception {
return null;
}
- private void executeMutator(int rows) {
- try {
- MutationResult mr = mutator.execute();
- // could be null here when our batch size is zero
- if ( mr.getHostUsed() != null ) {
- LatencyTracker writeCount = commandRunner.latencies.get(mr.getHostUsed());
- if ( writeCount != null )
- writeCount.addMicro(mr.getExecutionTimeMicro());
- }
- mutator.discardPendingMutations();
- log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, rows, commandArgs.getKeysPerThread()});
-
- } catch (Exception ex){
- log.error("Problem executing insert:",ex);
- }
- }
private static final String COLUMN_VAL_FORMAT = "%08d_%s";
private static final String COLUMN_NAME_FORMAT = "col_%08d";
View
3 src/main/java/com/riptano/cassandra/stress/Operation.java
@@ -6,7 +6,8 @@
RANGESLICE("rangeslice"),
MULTIGET("multiget"),
REPLAY("replay"),
- VERIFY_LAST_INSERT("verifylastinsert");
+ VERIFY_LAST_INSERT("verifylastinsert"),
+ COUNTERSPREAD("counterspread");
private final String op;
View
12 src/main/java/com/riptano/cassandra/stress/SliceCommand.java
@@ -1,26 +1,20 @@
package com.riptano.cassandra.stress;
import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.cassandra.utils.LatencyTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.service.CassandraHost;
import me.prettyprint.cassandra.service.HColumnFamilyImpl;
import me.prettyprint.hector.api.HColumnFamily;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SliceCommand extends StressCommand {
private static Logger log = LoggerFactory.getLogger(SliceCommand.class);
private final SliceQuery<String, String, String> sliceQuery;
+ // TODO replace with CFT!
private final HColumnFamily<String, String> columnFamily;
private static StringSerializer se = StringSerializer.get();
View
2 src/main/java/com/riptano/cassandra/stress/Stress.java
@@ -201,7 +201,7 @@ private void initializeCommandRunner(CommandLine cmd) throws Exception {
if ( commandArgs.validateCommand() && commandArgs.getOperation() != Operation.REPLAY) {
commandRunner.processCommand(commandArgs);
} else {
- throw new IllegalArgumentException();
+ throw new IllegalArgumentException("command: " + commandArgs.getOperation().toString());
}
}
View
28 src/main/java/com/riptano/cassandra/stress/StressCommand.java
@@ -2,7 +2,15 @@
import java.util.concurrent.Callable;
-public abstract class StressCommand implements Callable<Void> {
+import me.prettyprint.hector.api.mutation.MutationResult;
+import me.prettyprint.hector.api.mutation.Mutator;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class StressCommand implements Callable<Void> {
+
+ private static Logger log = LoggerFactory.getLogger(StressCommand.class);
protected final CommandArgs commandArgs;
protected final int startKey;
@@ -14,4 +22,22 @@ public StressCommand(int startKey, CommandArgs commandArgs, CommandRunner comman
this.commandRunner = commandRunner;
}
+ protected void executeMutator(Mutator mutator, int rows) {
+ try {
+ MutationResult mr = mutator.execute();
+ // could be null here when our batch size is zero
+ if ( mr.getHostUsed() != null ) {
+ LatencyTracker writeCount = commandRunner.latencies.get(mr.getHostUsed());
+ if ( writeCount != null )
+ writeCount.addMicro(mr.getExecutionTimeMicro());
+ }
+ mutator.discardPendingMutations();
+
+ log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, rows, commandArgs.getKeysPerThread()});
+
+ } catch (Exception ex){
+ log.error("Problem executing insert:",ex);
+ }
+ }
+
}

0 comments on commit 5a5d351

Please sign in to comment.