Permalink
Browse files

Code cleanup after review round 1

  • Loading branch information...
1 parent fde488a commit fe1b415254450fc3bd6ea6f844139f4de34ec770 Chinmay Soman committed Feb 3, 2012
@@ -39,6 +39,7 @@
import voldemort.VoldemortException;
import voldemort.annotations.jmx.JmxGetter;
+import voldemort.server.VoldemortConfig;
import voldemort.server.protocol.admin.AsyncOperationStatus;
import voldemort.store.readonly.FileFetcher;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
@@ -49,7 +50,6 @@
import voldemort.utils.DynamicThrottleLimit;
import voldemort.utils.EventThrottler;
import voldemort.utils.JmxUtils;
-import voldemort.utils.Props;
import voldemort.utils.Time;
import voldemort.utils.Utils;
@@ -59,8 +59,6 @@
public class HdfsFetcher implements FileFetcher {
private static final Logger logger = Logger.getLogger(HdfsFetcher.class);
- private static final long REPORTING_INTERVAL_BYTES = 25 * 1024 * 1024;
- private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private final Long maxBytesPerSecond, reportingIntervalBytes;
private final int bufferSize;
@@ -70,48 +68,53 @@
private long minBytesPerSecond = 0;
private DynamicThrottleLimit globalThrottleLimit = null;
- public HdfsFetcher(Props props) {
- this(props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
- : null,
- props.getBytes("fetcher.reporting.interval.bytes", REPORTING_INTERVAL_BYTES),
- (int) props.getBytes("hdfs.fetcher.buffer.size", DEFAULT_BUFFER_SIZE));
+ public HdfsFetcher(VoldemortConfig config) {
+ this(config.getMaxBytesPerSecond(),
+ config.getReportingIntervalBytes(),
+ config.getFetcherBufferSize());
logger.info("Created hdfs fetcher with throttle rate " + maxBytesPerSecond
+ ", buffer size " + bufferSize + ", reporting interval bytes "
+ reportingIntervalBytes);
}
- public HdfsFetcher(Props props, DynamicThrottleLimit dynThrottleLimit) {
+ public HdfsFetcher(VoldemortConfig config, DynamicThrottleLimit dynThrottleLimit) {
this(dynThrottleLimit,
- props.getBytes("fetcher.reporting.interval.bytes", REPORTING_INTERVAL_BYTES),
- (int) props.getBytes("hdfs.fetcher.buffer.size", DEFAULT_BUFFER_SIZE),
- props.containsKey("fetcher.min.bytes.per.sec") ? props.getBytes("fetcher.min.bytes.per.sec")
- : 0);
+ config.getReportingIntervalBytes(),
+ config.getFetcherBufferSize(),
+ config.getMinBytesPerSecond());
logger.info("Created hdfs fetcher with throttle rate " + dynThrottleLimit.getRate()
+ ", buffer size " + bufferSize + ", reporting interval bytes "
+ reportingIntervalBytes);
}
public HdfsFetcher() {
- this((Long) null, REPORTING_INTERVAL_BYTES, DEFAULT_BUFFER_SIZE);
+ this((Long) null,
+ VoldemortConfig.REPORTING_INTERVAL_BYTES,
+ VoldemortConfig.DEFAULT_BUFFER_SIZE);
}
public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
- this.maxBytesPerSecond = maxBytesPerSecond;
- if(this.maxBytesPerSecond != null) {
- this.throttler = new EventThrottler(this.maxBytesPerSecond);
- }
- this.reportingIntervalBytes = Utils.notNull(reportingIntervalBytes);
- this.bufferSize = bufferSize;
- this.status = null;
+ this(null, maxBytesPerSecond, reportingIntervalBytes, bufferSize, 0);
+ }
+
+ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
+ Long reportingIntervalBytes,
+ int bufferSize,
+ long minBytesPerSecond) {
+ this(dynThrottleLimit, null, reportingIntervalBytes, bufferSize, minBytesPerSecond);
}
public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
+ Long maxBytesPerSecond,
Long reportingIntervalBytes,
int bufferSize,
long minBytesPerSecond) {
- if(dynThrottleLimit != null && dynThrottleLimit.getRate() != 0) {
+ if(maxBytesPerSecond != null) {
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ this.throttler = new EventThrottler(this.maxBytesPerSecond);
+ } else if(dynThrottleLimit != null && dynThrottleLimit.getRate() != 0) {
this.maxBytesPerSecond = dynThrottleLimit.getRate();
this.throttler = new DynamicEventThrottler(dynThrottleLimit);
this.globalThrottleLimit = dynThrottleLimit;
@@ -132,17 +135,18 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
this.globalThrottleLimit.incrementNumJobs();
}
- Path path = new Path(sourceFileUrl);
- Configuration config = new Configuration();
- config.setInt("io.socket.receive.buffer", bufferSize);
- config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
- ConfigurableSocketFactory.class.getName());
- FileSystem fs = path.getFileSystem(config);
-
- CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
- ObjectName jmxName = JmxUtils.registerMbean("hdfs-copy-" + copyCount.getAndIncrement(),
- stats);
+ ObjectName jmxName = null;
try {
+
+ Path path = new Path(sourceFileUrl);
+ Configuration config = new Configuration();
+ config.setInt("io.socket.receive.buffer", bufferSize);
+ config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+ ConfigurableSocketFactory.class.getName());
+ FileSystem fs = path.getFileSystem(config);
+
+ CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
+ jmxName = JmxUtils.registerMbean("hdfs-copy-" + copyCount.getAndIncrement(), stats);
File destination = new File(destinationFile);
if(destination.exists()) {
@@ -439,15 +443,15 @@ public static void main(String[] args) throws Exception {
long maxBytesPerSec = 1024 * 1024 * 1024;
Path p = new Path(url);
Configuration config = new Configuration();
- config.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ config.setInt("io.file.buffer.size", VoldemortConfig.DEFAULT_BUFFER_SIZE);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
FileStatus status = p.getFileSystem(config).getFileStatus(p);
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec,
- REPORTING_INTERVAL_BYTES,
- DEFAULT_BUFFER_SIZE);
+ VoldemortConfig.REPORTING_INTERVAL_BYTES,
+ VoldemortConfig.DEFAULT_BUFFER_SIZE);
long start = System.currentTimeMillis();
File location = fetcher.fetch(url, System.getProperty("java.io.tmpdir") + File.separator
+ start);
@@ -51,6 +51,8 @@
public static final String VOLDEMORT_CONFIG_DIR = "VOLDEMORT_CONFIG_DIR";
private static final String VOLDEMORT_NODE_ID_VAR_NAME = "VOLDEMORT_NODE_ID";
public static int VOLDEMORT_DEFAULT_ADMIN_PORT = 6660;
+ public static final long REPORTING_INTERVAL_BYTES = 25 * 1024 * 1024;
+ public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private int nodeId;
@@ -92,6 +94,10 @@
private String readOnlyStorageDir;
private String readOnlySearchStrategy;
private int readOnlyDeleteBackupTimeMs;
+ private long maxBytesPerSecond;
+ private long minBytesPerSecond;
+ private long reportingIntervalBytes;
+ private int fetcherBufferSize;
private int coreThreads;
private int maxThreads;
@@ -224,6 +230,12 @@ public VoldemortConfig(Props props) {
+ File.separator
+ "read-only");
this.readOnlyDeleteBackupTimeMs = props.getInt("readonly.delete.backup.ms", 0);
+ this.maxBytesPerSecond = props.getBytes("fetcher.max.bytes.per.sec", 0);
+ this.minBytesPerSecond = props.getBytes("fetcher.min.bytes.per.sec", 0);
+ this.reportingIntervalBytes = props.getBytes("fetcher.reporting.interval.bytes",
+ REPORTING_INTERVAL_BYTES);
+ this.fetcherBufferSize = (int) props.getBytes("hdfs.fetcher.buffer.size",
+ DEFAULT_BUFFER_SIZE);
this.mysqlUsername = props.getString("mysql.user", "root");
this.mysqlPassword = props.getString("mysql.password", "");
@@ -1388,6 +1400,38 @@ public String getReadOnlySearchStrategy() {
return readOnlySearchStrategy;
}
+ public long getMaxBytesPerSecond() {
+ return maxBytesPerSecond;
+ }
+
+ public void setMaxBytesPerSecond(long maxBytesPerSecond) {
+ this.maxBytesPerSecond = maxBytesPerSecond;
+ }
+
+ public long getMinBytesPerSecond() {
+ return minBytesPerSecond;
+ }
+
+ public void setMinBytesPerSecond(long minBytesPerSecond) {
+ this.minBytesPerSecond = minBytesPerSecond;
+ }
+
+ public long getReportingIntervalBytes() {
+ return reportingIntervalBytes;
+ }
+
+ public void setReportingIntervalBytes(long reportingIntervalBytes) {
+ this.reportingIntervalBytes = reportingIntervalBytes;
+ }
+
+ public int getFetcherBufferSize() {
+ return fetcherBufferSize;
+ }
+
+ public void setFetcherBufferSize(int fetcherBufferSize) {
+ this.fetcherBufferSize = fetcherBufferSize;
+ }
+
public void setReadOnlySearchStrategy(String readOnlySearchStrategy) {
this.readOnlySearchStrategy = readOnlySearchStrategy;
}
@@ -31,6 +31,7 @@
import voldemort.VoldemortException;
import voldemort.server.ServiceType;
+import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
import voldemort.server.http.VoldemortServletContextListener;
import voldemort.server.storage.StorageService;
@@ -40,7 +41,6 @@
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.readonly.ReadOnlyUtils;
import voldemort.utils.ByteArray;
-import voldemort.utils.Props;
import voldemort.utils.ReflectUtils;
import voldemort.utils.Utils;
@@ -117,9 +117,8 @@ private void setFetcherClass(VoldemortServer server) {
logger.info("Loading fetcher " + className);
Class<?> cls = Class.forName(className.trim());
this.fileFetcher = (FileFetcher) ReflectUtils.callConstructor(cls,
- new Class<?>[] { Props.class },
- new Object[] { server.getVoldemortConfig()
- .getAllProps() });
+ new Class<?>[] { VoldemortConfig.class },
+ new Object[] { server.getVoldemortConfig() });
} catch(Exception e) {
throw new VoldemortException("Error loading file fetcher class " + className, e);
}
@@ -66,7 +66,6 @@
import voldemort.utils.EventThrottler;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.Pair;
-import voldemort.utils.Props;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.ReflectUtils;
import voldemort.utils.Utils;
@@ -132,11 +131,11 @@ private void setFetcherClass(VoldemortConfig voldemortConfig) {
Class<?> cls = Class.forName(className.trim());
this.fileFetcher = (FileFetcher) ReflectUtils.callConstructor(cls,
new Class<?>[] {
- Props.class,
+ VoldemortConfig.class,
storageService.getDynThrottleLimit()
.getClass() },
new Object[] {
- voldemortConfig.getAllProps(),
+ voldemortConfig,
storageService.getDynThrottleLimit() });
} catch(Exception e) {
throw new VoldemortException("Error loading file fetcher class " + className, e);
@@ -91,7 +91,6 @@
import voldemort.utils.EventThrottler;
import voldemort.utils.JmxUtils;
import voldemort.utils.Pair;
-import voldemort.utils.Props;
import voldemort.utils.ReflectUtils;
import voldemort.utils.SystemTime;
import voldemort.utils.Time;
@@ -109,8 +108,6 @@
private static final Logger logger = Logger.getLogger(StorageService.class.getName());
- private static final long DEFAULT_FETCH_RATE = 0;
-
private final VoldemortConfig voldemortConfig;
private final StoreRepository storeRepository;
private final SchedulerService scheduler;
@@ -162,12 +159,14 @@ public StorageService(StoreRepository storeRepository,
/*
* Initialize the dynamic throttle limit based on the per node limit
- * config
+ * config only if read-only engine is being used.
*/
- Props props = this.voldemortConfig.getAllProps();
- long rate = props.containsKey("fetcher.max.bytes.per.sec") ? props.getBytes("fetcher.max.bytes.per.sec")
- : DEFAULT_FETCH_RATE;
- this.dynThrottleLimit = new DynamicThrottleLimit(rate);
+ if(this.voldemortConfig.getStorageConfigurations()
+ .contains(ReadOnlyStorageConfiguration.class.getName())) {
+ long rate = this.voldemortConfig.getMaxBytesPerSecond();
+ this.dynThrottleLimit = new DynamicThrottleLimit(rate);
+ } else
+ this.dynThrottleLimit = null;
}
private void initStorageConfig(String configClassName) {
@@ -13,35 +13,29 @@ public DynamicThrottleLimit(long val) {
this.dynamicRatePerSecond = this.perNodeRate = val;
}
- public long getRate() {
+ synchronized public long getRate() {
return this.dynamicRatePerSecond;
}
- public void incrementNumJobs() {
- synchronized(this) {
- this.numJobs++;
- this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
- logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
- + this.dynamicRatePerSecond + " bytes / sec");
- }
+ synchronized public void incrementNumJobs() {
+ this.numJobs++;
+ this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
+ logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
+ + this.dynamicRatePerSecond + " bytes / sec");
}
- public void decrementNumJobs() {
- synchronized(this) {
- if(this.numJobs > 0)
- this.numJobs--;
- this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
- logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
- + this.dynamicRatePerSecond + " bytes / sec");
- }
+ synchronized public void decrementNumJobs() {
+ if(this.numJobs > 0)
+ this.numJobs--;
+ this.dynamicRatePerSecond = numJobs > 0 ? this.perNodeRate / numJobs : this.perNodeRate;
+ logger.debug("# Jobs = " + numJobs + ". Updating throttling rate to : "
+ + this.dynamicRatePerSecond + " bytes / sec");
}
- public long getSpeculativeRate() {
+ synchronized public long getSpeculativeRate() {
long dynamicRate = 0;
- synchronized(this) {
- int totalJobs = this.numJobs + 1;
- dynamicRate = totalJobs > 0 ? this.perNodeRate / totalJobs : this.perNodeRate;
- }
+ int totalJobs = this.numJobs + 1;
+ dynamicRate = totalJobs > 0 ? this.perNodeRate / totalJobs : this.perNodeRate;
return dynamicRate;
}
}

0 comments on commit fe1b415

Please sign in to comment.