Permalink
Browse files

Initial changes for setting minimum throttle limit per store

  • Loading branch information...
Chinmay Soman
Chinmay Soman committed Jan 25, 2012
1 parent 5b35d9f commit af20132ad83bf7b04502ab5d4c590c76a7dfbd11
@@ -67,7 +67,8 @@
private static final AtomicInteger copyCount = new AtomicInteger(0);
private AsyncOperationStatus status;
private EventThrottler throttler = null;
- private long numJobs = 0;
+ private long minBytesPerSecond = 0;
+ private DynamicThrottleLimit globalThrottleLimit = null;
public HdfsFetcher(Props props) {
this(props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
@@ -83,7 +84,9 @@ public HdfsFetcher(Props props) {
public HdfsFetcher(Props props, DynamicThrottleLimit dynThrottleLimit) {
this(dynThrottleLimit,
props.getBytes("fetcher.reporting.interval.bytes", REPORTING_INTERVAL_BYTES),
- (int) props.getBytes("hdfs.fetcher.buffer.size", DEFAULT_BUFFER_SIZE));
+ (int) props.getBytes("hdfs.fetcher.buffer.size", DEFAULT_BUFFER_SIZE),
+ props.containsKey("fetcher.min.bytes.per.sec") ? props.getBytes("fetcher.min.bytes.per.sec")
+ : 0);
logger.info("Created hdfs fetcher with throttle rate " + dynThrottleLimit.getRate()
+ ", buffer size " + bufferSize + ", reporting interval bytes "
@@ -110,8 +113,12 @@ public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int buff
public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
Long reportingIntervalBytes,
- int bufferSize) {
- this.maxBytesPerSecond = dynThrottleLimit.getRate();
+ int bufferSize,
+ long minBytesPerSecond) {
+ if(dynThrottleLimit != null)
+ this.maxBytesPerSecond = dynThrottleLimit.getRate();
+ else
+ this.maxBytesPerSecond = null;
if(this.maxBytesPerSecond != null) {
this.throttler = new DynamicEventThrottler(dynThrottleLimit);
logger.info("Initializing Dynamic Event throttler with rate : "
@@ -120,12 +127,15 @@ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
this.bufferSize = bufferSize;
this.status = null;
+ this.minBytesPerSecond = minBytesPerSecond;
+ this.globalThrottleLimit = dynThrottleLimit;
}
public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
- if(throttler != null) {
- DynamicEventThrottler dynThrottler = (DynamicEventThrottler) this.throttler;
- dynThrottler.incrementNumJobs();
+ if(this.globalThrottleLimit != null) {
+ if(this.globalThrottleLimit.getSpeculativeRate() < this.minBytesPerSecond)
+ throw new VoldemortException("Too many push jobs.");
+ this.globalThrottleLimit.incrementNumJobs();
}
Path path = new Path(sourceFileUrl);
@@ -154,9 +164,8 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
return null;
}
} finally {
- if(throttler != null) {
- DynamicEventThrottler dynThrottler = (DynamicEventThrottler) this.throttler;
- dynThrottler.decrementNumJobs();
+ if(this.globalThrottleLimit != null) {
+ this.globalThrottleLimit.decrementNumJobs();
}
JmxUtils.unregisterMbean(jmxName);
}
@@ -800,6 +800,15 @@ public void operate() {
updateStatus(message);
logger.info(message);
}
+ } catch(VoldemortException ve) {
+ String errorMessage = "File fetcher failed for "
+ + fetchUrl
+ + " and store '"
+ + storeName
+ + "' due to too many push jobs happening at the same time.";
+ updateStatus(errorMessage);
+ logger.error(errorMessage);
+ throw new VoldemortException(errorMessage);
} catch(Exception e) {
throw new VoldemortException("Exception in Fetcher = " + e.getMessage());
}

0 comments on commit af20132

Please sign in to comment.