Skip to content

Commit

Permalink
Merge pull request #3 from zznate/3345
Browse files Browse the repository at this point in the history
3345
  • Loading branch information
Nate McCall committed Nov 1, 2011
2 parents 8c9cb72 + fdb1d49 commit 7d998f2
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 4 deletions.
4 changes: 3 additions & 1 deletion README.mdown
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ This will perform the operation then drop you into a shell in which you can perf
[cassandra-stress] [cassandra-stress]
usage: operation [options] usage: operation [options]
operation can be one of: insert, read, rangeslice, multiget, replay [N] operation can be one of: insert, read, rangeslice, multiget, replay [N]
-o,--operation <arg> One of insert, read, rangeslice, multiget -o,--operation <arg> One of insert, read, rangeslice, multiget, verify_last_insert (1)
-b,--batch-size <arg> The number of rows in the batch_mutate call -b,--batch-size <arg> The number of rows in the batch_mutate call
-t,--threads <arg> The number of client threads we will create -t,--threads <arg> The number of client threads we will create
-c,--columns <arg> The number of columsn to create per key -c,--columns <arg> The number of columsn to create per key
-C,--clients <arg> The number of pooled clients to use -C,--clients <arg> The number of pooled clients to use


To re-run the operation again, just type the operation name. So from the initial example above, re-running the read operation with the same parameters would be: To re-run the operation again, just type the operation name. So from the initial example above, re-running the read operation with the same parameters would be:


(1) Must run single thread (-t 1) and QUORUM:QUORUM. Intended to test in a cluster of at least 3 nodes RF = 3.

`[cassanra-stress] read` `[cassanra-stress] read`




Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,7 +1,12 @@
package com.riptano.cassandra.stress; package com.riptano.cassandra.stress;


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;



public class CommandFactory { public class CommandFactory {

private static Logger log = LoggerFactory.getLogger(CommandFactory.class);


public static StressCommand getInstance(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) { public static StressCommand getInstance(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) {
switch(commandArgs.getOperation()) { switch(commandArgs.getOperation()) {
Expand All @@ -13,7 +18,10 @@ public static StressCommand getInstance(int startKey, CommandArgs commandArgs, C
return new RangeSliceCommand(startKey, commandArgs, commandRunner); return new RangeSliceCommand(startKey, commandArgs, commandRunner);
case MULTIGET: case MULTIGET:
return new MultigetSliceCommand(startKey, commandArgs, commandRunner); return new MultigetSliceCommand(startKey, commandArgs, commandRunner);
case VERIFY_LAST_INSERT:
return new VerifyLastInsertCommand(startKey, commandArgs, commandRunner);
}; };
log.info("Runnig default Insert command...");
return new InsertCommand(startKey, commandArgs, commandRunner); return new InsertCommand(startKey, commandArgs, commandRunner);
} }
} }
2 changes: 2 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/CommandRunner.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ private StressCommand getCommandInstance(int startKeyArg, CommandArgs commandArg
return new RangeSliceCommand(startKey, commandArgs, commandRunner); return new RangeSliceCommand(startKey, commandArgs, commandRunner);
case MULTIGET: case MULTIGET:
return new MultigetSliceCommand(startKey, commandArgs, commandRunner); return new MultigetSliceCommand(startKey, commandArgs, commandRunner);
case VERIFY_LAST_INSERT:
return new VerifyLastInsertCommand(startKey, commandArgs, commandRunner);
}; };
return new InsertCommand(startKey, commandArgs, commandRunner); return new InsertCommand(startKey, commandArgs, commandRunner);
} }
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/riptano/cassandra/stress/Operation.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ public enum Operation {
READ("read"), READ("read"),
RANGESLICE("rangeslice"), RANGESLICE("rangeslice"),
MULTIGET("multiget"), MULTIGET("multiget"),
REPLAY("replay"); REPLAY("replay"),
VERIFY_LAST_INSERT("verifylastinsert");


private final String op; private final String op;


Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.riptano.cassandra.stress; package com.riptano.cassandra.stress;


import java.util.concurrent.CountDownLatch;

import me.prettyprint.cassandra.serializers.StringSerializer; import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.OrderedRows; import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.factory.HFactory;
Expand Down
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,100 @@
package com.riptano.cassandra.stress;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;

import org.apache.cassandra.utils.LatencyTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Single threaded operation.
* Writes an integer, and read it back to verify it exists.
*
* Read/Write CL = Quorum.
*
* @author patricioe (Patricio Echague - patricio@datastax.com)
*
*/
public class VerifyLastInsertCommand extends StressCommand {

private static Logger log = LoggerFactory.getLogger(VerifyLastInsertCommand.class);

protected final Mutator<String> mutator;
private final SliceQuery<String, String, String> sliceQuery;
private StringSerializer se = StringSerializer.get();

public VerifyLastInsertCommand(int startKey, CommandArgs commandArgs, CommandRunner commandRunner) {
super(startKey, commandArgs, commandRunner);
verifyCondition();
mutator = HFactory.createMutator(commandArgs.keyspace, StringSerializer.get());
sliceQuery = HFactory.createSliceQuery(commandArgs.keyspace, se, se, se);
}

private void verifyCondition() {
if (commandArgs.threads != 1) {
log.error("VerifyLastInsertCommand should run single threaded. \"-t 1\"");
throw new RuntimeException("Total threads should be 1.");
}
}

@Override
public Void call() throws Exception {
log.debug("Starting VerifyLastInsertCommand");
String key = "test";
sliceQuery.setColumnFamily(commandArgs.workingColumnFamily);

log.info("StartKey: {} for thread {}", key, Thread.currentThread().getId());
String colValue;

for (int col = 0; col < commandArgs.columnCount; col++) {
colValue = String.format(COLUMN_VAL_FORMAT, col);
mutator.addInsertion(key,
commandArgs.workingColumnFamily,
HFactory.createStringColumn(String.format(COLUMN_NAME_FORMAT, col),
colValue));
executeMutator(col);

// Let's verify
sliceQuery.setKey(key);
sliceQuery.setRange(null, null, true, 1);
QueryResult<ColumnSlice<String,String>> result = sliceQuery.execute();
String actualValue = result.get().getColumns().get(0).getValue();

if (!actualValue.equals(colValue)) {
log.error("Column values don't match. Expected: " + colValue + " - Actual: " + actualValue);
break;
}
}

commandRunner.doneSignal.countDown();
log.debug("VerifyLastInsertCommand complete");
return null;
}

private void executeMutator(int cols) {
try {
MutationResult mr = mutator.execute();
// could be null here when our batch size is zero
if ( mr.getHostUsed() != null ) {
LatencyTracker writeCount = commandRunner.latencies.get(mr.getHostUsed());
if ( writeCount != null )
writeCount.addMicro(mr.getExecutionTimeMicro());
}
mutator.discardPendingMutations();

log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, cols, commandArgs.getKeysPerThread()});

} catch (Exception ex){
log.error("Problem executing insert:",ex);
}
}

private static final String COLUMN_VAL_FORMAT = "%08d";
private static final String COLUMN_NAME_FORMAT = "%08d";
}

0 comments on commit 7d998f2

Please sign in to comment.