Skip to content

Commit

Permalink
Generalized IoThrottler to EventThrottler.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbansal committed Oct 7, 2009
1 parent 9e0ba42 commit fd3466a
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.log4j.Logger;

import voldemort.store.readonly.FileFetcher;
import voldemort.utils.IoThrottler;
import voldemort.utils.EventThrottler;
import voldemort.utils.Props;
import voldemort.utils.Time;
import voldemort.utils.Utils;
Expand Down Expand Up @@ -84,9 +84,9 @@ public File fetch(String fileUrl) throws IOException {
Configuration config = new Configuration();
config.setInt("io.file.buffer.size", bufferSize);
FileSystem fs = path.getFileSystem(config);
IoThrottler throttler = null;
EventThrottler throttler = null;
if(maxBytesPerSecond != null)
throttler = new IoThrottler(maxBytesPerSecond);
throttler = new EventThrottler(maxBytesPerSecond);

// copy file
CopyStats stats = new CopyStats();
Expand All @@ -95,7 +95,7 @@ public File fetch(String fileUrl) throws IOException {
return destination;
}

private void fetch(FileSystem fs, Path source, File dest, IoThrottler throttler, CopyStats stats)
private void fetch(FileSystem fs, Path source, File dest, EventThrottler throttler, CopyStats stats)
throws IOException {
if(fs.isFile(source)) {
copyFile(fs, source, dest, throttler, stats);
Expand All @@ -122,7 +122,7 @@ private void fetch(FileSystem fs, Path source, File dest, IoThrottler throttler,
private void copyFile(FileSystem fs,
Path source,
File dest,
IoThrottler throttler,
EventThrottler throttler,
CopyStats stats) throws IOException {
logger.info("Starting copy of " + source + " to " + dest);
FSDataInputStream input = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import voldemort.utils.ByteBufferBackedInputStream;
import voldemort.utils.ByteUtils;
import voldemort.utils.ClosableIterator;
import voldemort.utils.IoThrottler;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
Expand Down Expand Up @@ -249,7 +249,7 @@ private StorageEngine<ByteArray, byte[]> readStorageEngine(DataInputStream input
private void handleUpdateEntries(StorageEngine<ByteArray, byte[]> engine,
DataInputStream inputStream,
DataOutputStream outputStream) throws IOException {
IoThrottler throttler = new IoThrottler(streamMaxBytesWritesPerSec);
EventThrottler throttler = new EventThrottler(streamMaxBytesWritesPerSec);

try {
int keySize = inputStream.readInt();
Expand Down Expand Up @@ -313,7 +313,7 @@ private void handleGetPartitionsAsStream(StorageEngine<ByteArray, byte[]> engine
}

RoutingStrategy routingStrategy = new RoutingStrategyFactory(metadata.getCurrentCluster()).getRoutingStrategy(metadata.getStoreDef(engine.getName()));
IoThrottler throttler = new IoThrottler(streamMaxBytesReadPerSec);
EventThrottler throttler = new EventThrottler(streamMaxBytesReadPerSec);
try {
/**
* TODO HIGH: This way to iterate over all keys is not optimal
Expand Down
6 changes: 3 additions & 3 deletions src/java/voldemort/server/scheduler/DataCleanupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import voldemort.store.StorageEngine;
import voldemort.utils.ClosableIterator;
import voldemort.utils.IoThrottler;
import voldemort.utils.EventThrottler;
import voldemort.utils.Pair;
import voldemort.utils.Time;
import voldemort.utils.Utils;
Expand All @@ -43,13 +43,13 @@ public class DataCleanupJob<K, V> implements Runnable {
private final Semaphore cleanupPermits;
private final long maxAgeMs;
private final Time time;
private final IoThrottler throttler;
private final EventThrottler throttler;

public DataCleanupJob(StorageEngine<K, V> store,
Semaphore cleanupPermits,
long maxAgeMs,
Time time,
IoThrottler throttler) {
EventThrottler throttler) {
this.store = Utils.notNull(store);
this.cleanupPermits = Utils.notNull(cleanupPermits);
this.maxAgeMs = maxAgeMs;
Expand Down
6 changes: 3 additions & 3 deletions src/java/voldemort/server/storage/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import voldemort.store.versioned.InconsistencyResolvingStore;
import voldemort.utils.ByteArray;
import voldemort.utils.ConfigurationException;
import voldemort.utils.IoThrottler;
import voldemort.utils.EventThrottler;
import voldemort.utils.JmxUtils;
import voldemort.utils.ReflectUtils;
import voldemort.utils.SystemTime;
Expand Down Expand Up @@ -254,7 +254,7 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
+ "' at " + startTime + " with retention scan throttle rate:" + maxReadRate
+ " Entries/second.");

IoThrottler throttler = new IoThrottler(maxReadRate);
EventThrottler throttler = new EventThrottler(maxReadRate);

Runnable cleanupJob = new DataCleanupJob<ByteArray, byte[]>(engine,
cleanupPermits,
Expand Down Expand Up @@ -376,7 +376,7 @@ public void forceCleanupOldDataThrottled(String storeName, int entryScanThrottle
storeDef.getRetentionDays()
* Time.MS_PER_DAY,
SystemTime.INSTANCE,
new IoThrottler(entryScanThrottleRate)));
new EventThrottler(entryScanThrottleRate)));
} else {
logger.error("forceCleanupOldData() No permit available to run cleanJob already running multiple instance."
+ engine.getName());
Expand Down
75 changes: 75 additions & 0 deletions src/java/voldemort/utils/EventThrottler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package voldemort.utils;

import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.annotations.concurrency.NotThreadsafe;

/**
* A class to throttle Events to a certain rate
*
* This class takes a maximum rate in events/sec and a minimum interval in ms at
* which to check the rate. The rate is checked every time the interval
* ellapses, and if the events rate exceeds the maximum, the call will block
* long enough to equalize it.
*
* This is generalized IoThrottler as it existed before, you can use it to
* throttle on Bytes read/write,number of entries scanned etc.
*
* @author jay
*
*/
@NotThreadsafe
public class EventThrottler {

private final static Logger logger = Logger.getLogger(EventThrottler.class);
private final static long DEFAULT_CHECK_INTERVAL_MS = 50;

private final Time time;
private final long ratesPerSecond;
private final long intervalMs;
private long startTime;
private long eventsSeenInLastInterval;

public EventThrottler(long ratesPerSecond) {
this(SystemTime.INSTANCE, ratesPerSecond, DEFAULT_CHECK_INTERVAL_MS);
}

public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
this.time = time;
this.intervalMs = intervalMs;
this.ratesPerSecond = ratePerSecond;
this.eventsSeenInLastInterval = 0L;
this.startTime = 0L;
}

public void maybeThrottle(int eventsSeen) {
eventsSeenInLastInterval += eventsSeen;
long now = time.getNanoseconds();
long ellapsedNs = now - startTime;
// if we have completed an interval AND we have seen some events, maybe
// we should take a little nap
if(ellapsedNs > intervalMs * Time.NS_PER_MS && eventsSeenInLastInterval > 0) {
long eventsPerSec = (eventsSeenInLastInterval * Time.NS_PER_SECOND) / ellapsedNs;
if(eventsPerSec > ratesPerSecond) {
// solve for the amount of time to sleep to make us hit the
// correct i/o rate
double maxEventsPerMs = ratesPerSecond / (double) Time.MS_PER_SECOND;
long ellapsedMs = ellapsedNs / Time.NS_PER_MS;
long sleepTime = Math.round(eventsSeenInLastInterval / maxEventsPerMs - ellapsedMs);
if(logger.isDebugEnabled())
logger.debug("Natural I/O rate is " + eventsPerSec
+ " bytes/sec, sleeping for " + sleepTime + " ms to compensate.");
if(sleepTime > 0) {
try {
time.sleep(sleepTime);
} catch(InterruptedException e) {
throw new VoldemortException(e);
}
}
}
startTime = now;
eventsSeenInLastInterval = 0;
}
}
}
72 changes: 0 additions & 72 deletions src/java/voldemort/utils/IoThrottler.java

This file was deleted.

4 changes: 2 additions & 2 deletions test/unit/voldemort/scheduled/DataCleanupJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import voldemort.server.scheduler.DataCleanupJob;
import voldemort.store.StorageEngine;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.utils.IoThrottler;
import voldemort.utils.EventThrottler;
import voldemort.utils.Time;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Versioned;
Expand Down Expand Up @@ -55,7 +55,7 @@ public void testCleanupCleansUp() {
new Semaphore(1),
Time.MS_PER_DAY,
time,
new IoThrottler(1)).run();
new EventThrottler(1)).run();

// Check that all the later keys are there AND the key updated later
assertContains("a", "d", "e", "f");
Expand Down
2 changes: 1 addition & 1 deletion test/unit/voldemort/utils/IoThrottlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testThrottler() {
public void testThrottler(int readSize, int numReads, long readTime, long throttledRate) {
long startTime = 1000;
MockTime time = new MockTime(startTime);
IoThrottler throttler = new IoThrottler(time, throttledRate, 50);
EventThrottler throttler = new EventThrottler(time, throttledRate, 50);
for(int i = 0; i < numReads; i++) {
time.addMilliseconds(readTime);
throttler.maybeThrottle(readSize);
Expand Down

0 comments on commit fd3466a

Please sign in to comment.