Skip to content

Commit

Permalink
Initial changes for RO bandwidth throttler
Browse files Browse the repository at this point in the history
  • Loading branch information
Chinmay Soman committed Nov 30, 2011
1 parent ad05eaf commit bd927ff
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import voldemort.store.readonly.checksum.CheckSum;
import voldemort.store.readonly.checksum.CheckSum.CheckSumType;
import voldemort.utils.ByteUtils;
import voldemort.utils.EventThrottler;
import voldemort.utils.DynamicEventThrottler;
import voldemort.utils.JmxUtils;
import voldemort.utils.Props;
import voldemort.utils.Time;
Expand All @@ -66,7 +66,8 @@ public class HdfsFetcher implements FileFetcher {
private final int bufferSize;
private static final AtomicInteger copyCount = new AtomicInteger(0);
private AsyncOperationStatus status;
private EventThrottler throttler = null;
private DynamicEventThrottler throttler = null;
private long numJobs = 0;

public HdfsFetcher(Props props) {
this(props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
Expand All @@ -86,13 +87,17 @@ public HdfsFetcher() {
public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
this.maxBytesPerSecond = maxBytesPerSecond;
if(this.maxBytesPerSecond != null)
this.throttler = new EventThrottler(this.maxBytesPerSecond);
this.throttler = new DynamicEventThrottler(this.maxBytesPerSecond);
this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
this.bufferSize = bufferSize;
this.status = null;
}

public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
synchronized(this) {
numJobs++;
throttler.updateRate(this.maxBytesPerSecond / numJobs);
}
Path path = new Path(sourceFileUrl);
Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
Expand Down
12 changes: 9 additions & 3 deletions src/java/voldemort/utils/EventThrottler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public EventThrottler(long ratesPerSecond) {
this(SystemTime.INSTANCE, ratesPerSecond, DEFAULT_CHECK_INTERVAL_MS);
}

public long getRate() {
return this.ratesPerSecond;
}

public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
this.time = time;
this.intervalMs = intervalMs;
Expand All @@ -43,22 +47,24 @@ public EventThrottler(Time time, long ratePerSecond, long intervalMs) {
}

public synchronized void maybeThrottle(int eventsSeen) {
long rateLimit = getRate();
System.err.println("Rate = " + rateLimit);
eventsSeenInLastInterval += eventsSeen;
long now = time.getNanoseconds();
long ellapsedNs = now - startTime;
// if we have completed an interval AND we have seen some events, maybe
// we should take a little nap
if(ellapsedNs > intervalMs * Time.NS_PER_MS && eventsSeenInLastInterval > 0) {
long eventsPerSec = (eventsSeenInLastInterval * Time.NS_PER_SECOND) / ellapsedNs;
if(eventsPerSec > ratesPerSecond) {
if(eventsPerSec > rateLimit) {
// solve for the amount of time to sleep to make us hit the
// correct i/o rate
double maxEventsPerMs = ratesPerSecond / (double) Time.MS_PER_SECOND;
double maxEventsPerMs = rateLimit / (double) Time.MS_PER_SECOND;
long ellapsedMs = ellapsedNs / Time.NS_PER_MS;
long sleepTime = Math.round(eventsSeenInLastInterval / maxEventsPerMs - ellapsedMs);
if(logger.isDebugEnabled())
logger.debug("Natural rate is " + eventsPerSec
+ " events/sec max allowed rate is " + ratesPerSecond
+ " events/sec max allowed rate is " + rateLimit
+ " events/sec, sleeping for " + sleepTime + " ms to compensate.");
if(sleepTime > 0) {
try {
Expand Down

0 comments on commit bd927ff

Please sign in to comment.