Skip to content

Commit

Permalink
initial add of files
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Sep 29, 2010
0 parents commit c4a8c93
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 0 deletions.
89 changes: 89 additions & 0 deletions pom.xml
@@ -0,0 +1,89 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.riptano.cassandra.stress</groupId>
<artifactId>cassandra-stress</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>cassandra-stress</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<optimize>true</optimize>
<debug>true</debug>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<!--
run examples thusly:
mvn exec:java -Dexec.mainClass="com.riptano.cassandra.stress.Stress" -Dexec.args="-clients 10"
-->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.prettyprint</groupId>
<artifactId>hector</artifactId>
<version>0.7.0-17</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
</dependencies>

<repositories>
<repository>
<id>riptano</id>
<name>riptano</name>
<url>http://mvn.riptano.com/content/repositories/public/</url>
</repository>
</repositories>
</project>
31 changes: 31 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/CommandArgs.java
@@ -0,0 +1,31 @@
package com.riptano.cassandra.stress;

import me.prettyprint.hector.api.Keyspace;

public class CommandArgs {

public Keyspace keyspace;
public int rowCount = DEF_INSERT_COUNT;
public int columnCount = DEF_COLUMN_COUNT;
public int batchSize = DEF_BATCH_SIZE;
public String operation = DEF_OPERATION;
public int clients = DEF_CLIENTS;

private static int DEF_CLIENTS = 50;
private static int DEF_INSERT_COUNT = 10000;
private static int DEF_BATCH_SIZE = 100;
private static int DEF_COLUMN_COUNT = 10;
private static String DEF_OPERATION = "insert";

public int getKeysPerThread() {
return rowCount / clients;
}



public Operation getOperation() {
return Operation.get(operation);
}


}
20 changes: 20 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/CommandFactory.java
@@ -0,0 +1,20 @@
package com.riptano.cassandra.stress;

import java.util.concurrent.CountDownLatch;

public class CommandFactory {

public static StressCommand getInstance(int startKey, CommandArgs commandArgs, CountDownLatch countDownLatch) {
switch(commandArgs.getOperation()) {
case INSERT:
return new InsertCommand(startKey, commandArgs, countDownLatch);
case READ:
return new SliceCommand(startKey, commandArgs, countDownLatch);
case RANGE_SLICE:
return new RangeSliceCommand(startKey, commandArgs, countDownLatch);
case MULTI_GET:
// TODO
};
return new InsertCommand(startKey, commandArgs, countDownLatch);
}
}
59 changes: 59 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/InsertCommand.java
@@ -0,0 +1,59 @@
package com.riptano.cassandra.stress;

import java.util.concurrent.CountDownLatch;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.MutationResult;
import me.prettyprint.hector.api.mutation.Mutator;

import org.apache.cassandra.utils.LatencyTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InsertCommand extends StressCommand {

private static Logger log = LoggerFactory.getLogger(InsertCommand.class);

protected final Mutator<String> mutator;

public InsertCommand(int startKey, CommandArgs commandArgs, CountDownLatch countDownLatch) {
super(startKey, commandArgs, countDownLatch);
mutator = HFactory.createMutator(commandArgs.keyspace, StringSerializer.get());
}

@Override
public Void call() throws Exception {

String key = null;
int rows = 0;
log.info("StartKey: {} for thread {}", startKey, Thread.currentThread().getId());
while (rows < commandArgs.getKeysPerThread()) {
for (int j = 0; j < commandArgs.batchSize; j++) {
key = String.format("%010d", rows+startKey);
for (int j2 = 0; j2 < commandArgs.columnCount; j2++) {
mutator.addInsertion(key, "Standard1", HFactory.createStringColumn("c"+j2, "value_"+j2));
}
rows++;
}

MutationResult mr = mutator.execute();
LatencyTracker writeCount = Stress.latencies.get(mr.getHostUsed());
writeCount.addMicro(mr.getExecutionTimeMicro());
mutator.discardPendingMutations();
log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, rows, commandArgs.getKeysPerThread()});
}
countDownLatch.countDown();
log.info("Last key was: {} for thread {}", key, Thread.currentThread().getId());
// while less than mutationBatchSize,
// - while less than rowCount
// - mutator.insert
// mutator.execute();


log.info("Executed chunk of {}. Latch now at {}", commandArgs.getKeysPerThread(), countDownLatch.getCount());
return null;
}


}
18 changes: 18 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/Operation.java
@@ -0,0 +1,18 @@
package com.riptano.cassandra.stress;

public enum Operation {
INSERT("insert"),
READ("read"),
RANGESLICE("rangeslice"),
MULTIGET("multiget");

private final String op;

Operation(String val) {
this.op = val;
}

public static Operation get(String op) {
return Operation.valueOf(op.toUpperCase());
}
}
49 changes: 49 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/RangeSliceCommand.java
@@ -0,0 +1,49 @@
package com.riptano.cassandra.stress;

import java.util.concurrent.CountDownLatch;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.RangeSlicesQuery;

import org.apache.cassandra.utils.LatencyTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeSliceCommand extends StressCommand {

private static Logger log = LoggerFactory.getLogger(RangeSliceCommand.class);

private final RangeSlicesQuery<String, String, String> rangeSlicesQuery;
private StringSerializer se = StringSerializer.get();

public RangeSliceCommand(int startKey, CommandArgs commandArgs,
CountDownLatch countDownLatch) {
super(startKey, commandArgs, countDownLatch);
rangeSlicesQuery = HFactory.createRangeSlicesQuery(commandArgs.keyspace, se, se, se);
}

@Override
public Void call() throws Exception {
int rows = 0;
rangeSlicesQuery.setColumnFamily("Standard1");
log.debug("Starting SliceCommand");
while (rows < commandArgs.getKeysPerThread()) {
for (int i = 0; i < commandArgs.batchSize; i++) {
rangeSlicesQuery.setKeys(String.format("%010d", startKey), String.format("%010d",startKey + commandArgs.getKeysPerThread()));
rangeSlicesQuery.setRange(null, null, false, commandArgs.columnCount);
QueryResult<OrderedRows<String,String,String>> result = rangeSlicesQuery.execute();
LatencyTracker readCount = Stress.latencies.get(result.getHostUsed());
readCount.addMicro(result.getExecutionTimeMicro());
rows++;
}
log.info("executed batch of {}. {} of {} complete", new Object[]{commandArgs.batchSize, rows, commandArgs.getKeysPerThread()});
}
countDownLatch.countDown();
log.debug("SliceCommand complete");
return null;
}

}
48 changes: 48 additions & 0 deletions src/main/java/com/riptano/cassandra/stress/SliceCommand.java
@@ -0,0 +1,48 @@
package com.riptano.cassandra.stress;

import java.util.Random;
import java.util.concurrent.CountDownLatch;

import org.apache.cassandra.utils.LatencyTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.QueryResult;
import me.prettyprint.hector.api.query.SliceQuery;

public class SliceCommand extends StressCommand {
private static Logger log = LoggerFactory.getLogger(SliceCommand.class);

private final SliceQuery<String, String, String> sliceQuery;

private static StringSerializer se = StringSerializer.get();

public SliceCommand(int startKey, CommandArgs commandArgs, CountDownLatch countDownLatch) {
super(startKey, commandArgs, countDownLatch);
sliceQuery = HFactory.createSliceQuery(commandArgs.keyspace, se, se, se);
}

@Override
public Void call() throws Exception {
int rows = 0;
Random random = new Random();
sliceQuery.setColumnFamily("Standard1");
log.debug("Starting SliceCommand");
while (rows < commandArgs.getKeysPerThread()) {
sliceQuery.setKey(String.format("%010d", startKey + random.nextInt(commandArgs.getKeysPerThread())));
sliceQuery.setRange(null, null, false, commandArgs.columnCount);
QueryResult<ColumnSlice<String,String>> result = sliceQuery.execute();
LatencyTracker readCount = Stress.latencies.get(result.getHostUsed());
readCount.addMicro(result.getExecutionTimeMicro());
rows++;
}
countDownLatch.countDown();
log.debug("SliceCommand complete");
return null;
}

}

0 comments on commit c4a8c93

Please sign in to comment.