Skip to content

Commit

Permalink
extract rateLimiter use from operation
Browse files Browse the repository at this point in the history
- ready now returns partitionCount
- extract rateLimiter/opsDistribution/workManager interaction into an
operation stream class
This is done in preparation for replacing rate limiter with a similar
mechanism that will expose the operation intended time.
  • Loading branch information
nitsanw committed May 16, 2016
1 parent 5974ace commit 8fb6f89
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 29 deletions.
Expand Up @@ -48,7 +48,7 @@ public static interface RunOp
public int rowCount();
}

public abstract boolean ready(WorkManager permits, RateLimiter rateLimiter);
public abstract int ready(WorkManager permits);

public boolean isWrite()
{
Expand Down
65 changes: 49 additions & 16 deletions tools/stress/src/org/apache/cassandra/stress/StressAction.java
Expand Up @@ -25,17 +25,18 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;

import org.apache.cassandra.stress.operations.OpDistribution;
import org.apache.cassandra.stress.operations.OpDistributionFactory;
import org.apache.cassandra.stress.settings.ConnectionAPI;
import org.apache.cassandra.stress.settings.SettingsCommand;
import org.apache.cassandra.stress.settings.StressSettings;
import org.apache.cassandra.stress.util.JavaDriverClient;
import org.apache.cassandra.stress.util.ThriftClient;
import org.apache.cassandra.transport.SimpleClient;

import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;

public class StressAction implements Runnable
{

Expand All @@ -58,6 +59,7 @@ public void run()

if (!settings.command.noWarmup)
warmup(settings.command.getFactory(settings));

if (settings.command.truncate == SettingsCommand.TruncateWhen.ONCE)
settings.command.truncateTables(settings);

Expand Down Expand Up @@ -262,14 +264,43 @@ else if (opCount <= 0)
return metrics;
}

private static class StreamOfOperations {
private final OpDistribution operations;
private final RateLimiter rateLimiter;
private final WorkManager workManager;
public StreamOfOperations(OpDistribution operations, RateLimiter rateLimiter, WorkManager workManager)
{
this.operations = operations;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
}

Operation nextOp()
{
Operation op = operations.next();
final int partitionCount = op.ready(workManager);
if (partitionCount == 0)
return null;
if (rateLimiter != null)
rateLimiter.acquire(partitionCount);
return op;
}

void close()
{
operations.closeTimers();
}

void abort()
{
workManager.stop();
}
}
private class Consumer extends Thread
{

private final OpDistribution operations;
private final StreamOfOperations opStream;
private final StressMetrics metrics;
private final RateLimiter rateLimiter;
private volatile boolean success = true;
private final WorkManager workManager;
private final CountDownLatch done;

public Consumer(OpDistribution operations,
Expand All @@ -279,10 +310,8 @@ public Consumer(OpDistribution operations,
RateLimiter rateLimiter)
{
this.done = done;
this.rateLimiter = rateLimiter;
this.workManager = workManager;
this.metrics = metrics;
this.operations = operations;
this.opStream = new StreamOfOperations(operations, rateLimiter, workManager);
}

public void run()
Expand All @@ -293,7 +322,9 @@ public void run()
ThriftClient tclient = null;
JavaDriverClient jclient = null;

switch (settings.mode.api)

final ConnectionAPI clientType = settings.mode.api;
switch (clientType)
{
case JAVA_DRIVER_NATIVE:
jclient = settings.getJavaDriverClient();
Expand All @@ -311,13 +342,14 @@ public void run()

while (true)
{
Operation op = operations.next();
if (!op.ready(workManager, rateLimiter))
// Assumption: All ops are thread local, operations are never shared across threads.
Operation op = opStream.nextOp();
if (op == null)
break;

try
{
switch (settings.mode.api)
switch (clientType)
{
case JAVA_DRIVER_NATIVE:
op.run(jclient);
Expand All @@ -339,7 +371,8 @@ public void run()
e.printStackTrace(output);

success = false;
workManager.stop();
opStream.abort();

metrics.cancel();
return;
}
Expand All @@ -353,7 +386,7 @@ public void run()
finally
{
done.countDown();
operations.closeTimers();
opStream.close();
}
}
}
Expand Down
Expand Up @@ -77,14 +77,14 @@ public PartitionOperation(Timer timer, StressSettings settings, DataSpec spec)
this.spec = spec;
}

public boolean ready(WorkManager permits, RateLimiter rateLimiter)
public int ready(WorkManager permits)
{
int partitionCount = (int) spec.partitionCount.next();
if (partitionCount <= 0)
return false;
return 0;
partitionCount = permits.takePermits(partitionCount);
if (partitionCount <= 0)
return false;
return 0;

int i = 0;
boolean success = true;
Expand All @@ -105,11 +105,8 @@ public boolean ready(WorkManager permits, RateLimiter rateLimiter)
}
partitionCount = i;

if (rateLimiter != null)
rateLimiter.acquire(partitionCount);

partitions = partitionCache.subList(0, partitionCount);
return !partitions.isEmpty();
return partitions.size();
}

protected boolean reset(Seed seed, PartitionIterator iterator)
Expand Down
Expand Up @@ -248,18 +248,18 @@ public void run(ThriftClient client) throws IOException
timeWithRetry(new ThriftRun(client));
}

public boolean ready(WorkManager workManager, RateLimiter rateLimiter)
public int ready(WorkManager workManager)
{
tokenRangeIterator.update();

if (tokenRangeIterator.exhausted() && currentState.get() == null)
return false;
return 0;

int numLeft = workManager.takePermits(1);
if (rateLimiter != null && numLeft > 0 )
rateLimiter.acquire(numLeft);
// if (rateLimiter != null && numLeft > 0 )
// rateLimiter.acquire(numLeft);

return numLeft > 0;
return numLeft > 0 ? 1 : 0;
}

public String key()
Expand Down

0 comments on commit 8fb6f89

Please sign in to comment.