Permalink
Browse files

Cleanups in HdfsFetcher. Added Dynamic bandwidth throttling classes

  • Loading branch information...
1 parent 8e088b9 commit 4e41b6ae07428dbd1d75c826070ca82d71d82e28 Chinmay Soman committed Jan 26, 2012
@@ -100,11 +100,7 @@ public HdfsFetcher() {
public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
this.maxBytesPerSecond = maxBytesPerSecond;
if(this.maxBytesPerSecond != null) {
- // this.throttler = new
- // DynamicEventThrottler(this.maxBytesPerSecond);
this.throttler = new EventThrottler(this.maxBytesPerSecond);
- logger.info("Initializing Dynamic Event throttler with rate : "
- + this.maxBytesPerSecond + " bytes / sec");
}
this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
this.bufferSize = bufferSize;
@@ -115,15 +111,13 @@ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
Long reportingIntervalBytes,
int bufferSize,
long minBytesPerSecond) {
- if(dynThrottleLimit != null)
+ if(dynThrottleLimit != null) {
this.maxBytesPerSecond = dynThrottleLimit.getRate();
- else
- this.maxBytesPerSecond = null;
- if(this.maxBytesPerSecond != null) {
this.throttler = new DynamicEventThrottler(dynThrottleLimit);
logger.info("Initializing Dynamic Event throttler with rate : "
+ this.maxBytesPerSecond + " bytes / sec");
- }
+ } else
+ this.maxBytesPerSecond = null;
this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
this.bufferSize = bufferSize;
this.status = null;
@@ -0,0 +1,42 @@
+package voldemort.utils;
+
+public class DynamicEventThrottler extends EventThrottler {
+
+ private long dynamicRatePerSecond = 0l;
+ private DynamicThrottleLimit dynThrottleLimit;
+
+ public DynamicEventThrottler(long ratesPerSecond) {
+ super(ratesPerSecond);
+ this.dynamicRatePerSecond = ratesPerSecond;
+ this.dynThrottleLimit = null;
+ }
+
+ public DynamicEventThrottler(DynamicThrottleLimit dynLimit) {
+ super(dynLimit.getRate());
+ this.dynThrottleLimit = dynLimit;
+ }
+
+ public DynamicEventThrottler(Time time, long ratePerSecond, long intervalMs) {
+ super(time, ratePerSecond, intervalMs);
+ }
+
+ public void updateRate(long l) {
+ this.dynamicRatePerSecond = l;
+ }
+
+ public void incrementNumJobs() {
+ this.dynThrottleLimit.incrementNumJobs();
+ }
+
+ public void decrementNumJobs() {
+ this.dynThrottleLimit.decrementNumJobs();
+ }
+
+ @Override
+ public long getRate() {
+ if(this.dynThrottleLimit != null)
+ return dynThrottleLimit.getRate();
+ else
+ return this.dynamicRatePerSecond;
+ }
+}
@@ -0,0 +1,51 @@
+package voldemort.utils;
+
+import org.apache.log4j.Logger;
+
+public class DynamicThrottleLimit {
+
+ private long perNodeRate = 0l;
+ private long dynamicRatePerSecond = 0l;
+ private int numJobs = 0;
+ private final static Logger logger = Logger.getLogger(DynamicThrottleLimit.class);
+
+ public DynamicThrottleLimit(long val) {
+ this.dynamicRatePerSecond = this.perNodeRate = val;
+ }
+
+ public long getRate() {
+ long retVal;
+ synchronized(this) {
+ retVal = this.dynamicRatePerSecond;
+ }
+ return retVal;
+ }
+
+ public void incrementNumJobs() {
+ synchronized(this) {
+ this.numJobs++;
+ this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
+ logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
+ + this.dynamicRatePerSecond + " bytes / sec");
+ }
+ }
+
+ public void decrementNumJobs() {
+ synchronized(this) {
+ if(this.numJobs > 0)
+ this.numJobs--;
+ this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
+ logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
+ + this.dynamicRatePerSecond + " bytes / sec");
+ }
+ }
+
+ public long getSpeculativeRate() {
+ long dynamicRate = 0;
+ synchronized(this) {
+ int totalJobs = this.numJobs + 1;
+ dynamicRate = totalJobs > 0 ? this.perNodeRate / totalJobs : this.perNodeRate;
+ }
+ return dynamicRate;
+ }
+}

0 comments on commit 4e41b6a

Please sign in to comment.