Browse files

Add new operation verifyLastInsert

It inserts sequential columns and expect to
read the last value. It all needs to happen in
Quorum and it is useful to use it in a at least 3 nodes
cluster and single thread.
  • Loading branch information...
1 parent 8c9cb72 commit e8a7664140c0b1fbe9375f66cb4b3a7523c62ee3 @patricioe patricioe committed Nov 1, 2011
View
8 src/main/java/com/riptano/cassandra/stress/CommandFactory.java
@@ -1,7 +1,12 @@
package com.riptano.cassandra.stress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class CommandFactory {
+
+ private static Logger log = LoggerFactory.getLogger(CommandFactory.class);
public static StressCommand getInstance(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) {
switch(commandArgs.getOperation()) {
@@ -13,7 +18,10 @@ public static StressCommand getInstance(int startKey, CommandArgs commandArgs, C
return new RangeSliceCommand(startKey, commandArgs, commandRunner);
case MULTIGET:
return new MultigetSliceCommand(startKey, commandArgs, commandRunner);
+ case VERIFY_LAST_INSERT:
+ return new VerifyLastInsertCommand(startKey, commandArgs, commandRunner);
};
+ log.info("Runnig default Insert command...");
return new InsertCommand(startKey, commandArgs, commandRunner);
}
}
View
2 src/main/java/com/riptano/cassandra/stress/CommandRunner.java
@@ -87,6 +87,8 @@ private StressCommand getCommandInstance(int startKeyArg, CommandArgs commandArg
return new RangeSliceCommand(startKey, commandArgs, commandRunner);
case MULTIGET:
return new MultigetSliceCommand(startKey, commandArgs, commandRunner);
+ case VERIFY_LAST_INSERT:
+ return new VerifyLastInsertCommand(startKey, commandArgs, commandRunner);
};
return new InsertCommand(startKey, commandArgs, commandRunner);
}
View
3 src/main/java/com/riptano/cassandra/stress/Operation.java
@@ -5,7 +5,8 @@
READ("read"),
RANGESLICE("rangeslice"),
MULTIGET("multiget"),
- REPLAY("replay");
+ REPLAY("replay"),
+ VERIFY_LAST_INSERT("verifylastinsert");
private final String op;
View
2 src/main/java/com/riptano/cassandra/stress/RangeSliceCommand.java
@@ -1,7 +1,5 @@
package com.riptano.cassandra.stress;
-import java.util.concurrent.CountDownLatch;
-
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.factory.HFactory;
View
98 src/main/java/com/riptano/cassandra/stress/VerifyLastInsertCommand.java
@@ -0,0 +1,98 @@
+package com.riptano.cassandra.stress;
+
+import me.prettyprint.cassandra.serializers.StringSerializer;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.MutationResult;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.SliceQuery;
+
+import org.apache.cassandra.utils.LatencyTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Single threaded operation.
+ * Writes an integer, and read it back to verify it exists.
+ *
+ * Read/Write CL = Quorum.
+ *
+ * @author patricioe (Patricio Echague - patricio@datastax.com)
+ *
+ */
+public class VerifyLastInsertCommand extends StressCommand {
+
+ private static Logger log = LoggerFactory.getLogger(VerifyLastInsertCommand.class);
+
+ protected final Mutator<String> mutator;
+ private final SliceQuery<String, String, String> sliceQuery;
+ private StringSerializer se = StringSerializer.get();
+
+ public VerifyLastInsertCommand(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) {
+ super(startKey, commandArgs, commandRunner);
+ verifyCondition();
+ mutator = HFactory.createMutator(commandArgs.keyspace, StringSerializer.get());
+ sliceQuery = HFactory.createSliceQuery(commandArgs.keyspace, se, se, se);
+ }
+
+ private void verifyCondition() {
+ if (commandArgs.threads != 1) {
+ log.error("VerifyLastInsertCommand should run single threaded. \"-t 1\"");
+ throw new RuntimeException("Total threads should be 1.");
+ }
+ }
+
+ @Override
+ public Void call() throws Exception {
+
+ String key = "test";
+ sliceQuery.setColumnFamily(commandArgs.workingColumnFamily);
+
+ log.info("StartKey: {} for thread {}", key, Thread.currentThread().getId());
+ String colValue;
+
+ for (int col = 0; col < commandArgs.columnCount; col++) {
+ colValue = String.format(COLUMN_VAL_FORMAT, col);
+ mutator.addInsertion(key,
+ commandArgs.workingColumnFamily,
+ HFactory.createStringColumn(String.format(COLUMN_NAME_FORMAT, col),
+ colValue));
+ executeMutator(col);
+
+ // Let's verify
+ sliceQuery.setKey(key);
+ sliceQuery.setRange(null, null, true, 1);
+ QueryResult<ColumnSlice<String,String>> result = sliceQuery.execute();
+ String actualValue = result.get().getColumns().get(0).getValue();
+ if (!actualValue.equals(colValue)) {
+ log.error("Column values don't match. Expected: " + colValue + " - Actual: " + actualValue);
+ break;
+ }
+ }
+
+ commandRunner.doneSignal.countDown();
+ return null;
+ }
+
+ private void executeMutator(int cols) {
+ try {
+ MutationResult mr = mutator.execute();
+ // could be null here when our batch size is zero
+ if ( mr.getHostUsed() != null ) {
+ LatencyTracker writeCount = commandRunner.latencies.get(mr.getHostUsed());
+ if ( writeCount != null )
+ writeCount.addMicro(mr.getExecutionTimeMicro());
+ }
+ mutator.discardPendingMutations();
+
+ log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, cols, commandArgs.getKeysPerThread()});
+
+ } catch (Exception ex){
+ log.error("Problem executing insert:",ex);
+ }
+ }
+
+ private static final String COLUMN_VAL_FORMAT = "%08d_%s";
+ private static final String COLUMN_NAME_FORMAT = "col_%08d";
+}

0 comments on commit e8a7664

Please sign in to comment.