Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added the ability to stop / start any scheduled process - Helpful for…

… stopping slop scheduled job
  • Loading branch information...
commit a25f113125777a893d6a6e2e03697ecf2808c921 1 parent b3a63a1
@rsumbaly rsumbaly authored afeinberg committed
View
2  contrib/ec2-testing/bin/voldemort-deployer.sh
@@ -15,4 +15,4 @@
# limitations under the License.
#
-$(dirname $0)/run-class.sh $CLASSPATH voldemort.utils.app.VoldemortDeployerApp $@
+$(dirname $0)/run-class.sh voldemort.utils.app.VoldemortDeployerApp $@
View
2  src/java/voldemort/server/rebalance/RebalancerService.java
@@ -32,7 +32,7 @@ public RebalancerService(MetadataStore metadataStore,
@Override
protected void startInner() {
rebalancer.start();
- schedulerService.schedule(rebalancer, new Date(), periodMs);
+ schedulerService.schedule("rebalancer", rebalancer, new Date(), periodMs);
}
@Override
View
95 src/java/voldemort/server/scheduler/SchedulerService.java
@@ -17,13 +17,21 @@
package voldemort.server.scheduler;
import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import javax.management.MBeanOperationInfo;
+
+import org.apache.log4j.Logger;
+
import voldemort.annotations.jmx.JmxManaged;
+import voldemort.annotations.jmx.JmxOperation;
import voldemort.server.AbstractService;
import voldemort.server.ServiceType;
+import voldemort.server.VoldemortService;
import voldemort.utils.Time;
/**
@@ -31,16 +39,53 @@
*
*
*/
+@SuppressWarnings("unchecked")
@JmxManaged(description = "A service that runs scheduled jobs.")
public class SchedulerService extends AbstractService {
+ private static final Logger logger = Logger.getLogger(VoldemortService.class);
+
+ private class ScheduledRunnable {
+
+ private Runnable runnable;
+ private Date delayDate;
+ private long intervalMs;
+
+ ScheduledRunnable(Runnable runnable, Date delayDate, long intervalMs) {
+ this.runnable = runnable;
+ this.delayDate = delayDate;
+ this.intervalMs = intervalMs;
+ }
+
+ ScheduledRunnable(Runnable runnable, Date delayDate) {
+ this(runnable, delayDate, 0);
+ }
+
+ Runnable getRunnable() {
+ return this.runnable;
+ }
+
+ Date getDelayDate() {
+ return this.delayDate;
+ }
+
+ long getIntervalMs() {
+ return this.intervalMs;
+ }
+ }
+
private final ScheduledThreadPoolExecutor scheduler;
private final Time time;
+ private final ConcurrentHashMap<String, ScheduledFuture> scheduledJobResults;
+ private final ConcurrentHashMap<String, ScheduledRunnable> allJobs;
+
public SchedulerService(int schedulerThreads, Time time) {
super(ServiceType.SCHEDULER);
this.time = time;
this.scheduler = new SchedulerThreadPool(schedulerThreads);
+ this.scheduledJobResults = new ConcurrentHashMap<String, ScheduledFuture>();
+ this.allJobs = new ConcurrentHashMap<String, ScheduledRunnable>();
}
@Override
@@ -51,16 +96,58 @@ public void stopInner() {
this.scheduler.shutdownNow();
}
+ @JmxOperation(description = "Disable a particular scheduled job", impact = MBeanOperationInfo.ACTION)
+ public void disable(String id) {
+ if(allJobs.containsKey(id) && scheduledJobResults.containsKey(id)) {
+ ScheduledFuture<?> future = scheduledJobResults.get(id);
+ boolean cancelled = future.cancel(false);
+ if(cancelled == true) {
+ logger.info("Removed '" + id + "' to list of scheduled jobs");
+ scheduledJobResults.remove(id);
+ }
+ }
+ }
+
+ @JmxOperation(description = "Enable a particular scheduled job", impact = MBeanOperationInfo.ACTION)
+ public void enable(String id) {
+ if(allJobs.containsKey(id) && !scheduledJobResults.containsKey(id)) {
+ ScheduledRunnable scheduledRunnable = allJobs.get(id);
+ logger.info("Adding '" + id + "' to list of scheduled jobs");
+ if(scheduledRunnable.getIntervalMs() > 0) {
+ schedule(id,
+ scheduledRunnable.getRunnable(),
+ scheduledRunnable.getDelayDate(),
+ scheduledRunnable.getIntervalMs());
+ } else {
+ schedule(id, scheduledRunnable.getRunnable(), scheduledRunnable.getDelayDate());
+ }
+
+ }
+ }
+
public void scheduleNow(Runnable runnable) {
scheduler.execute(runnable);
}
- public void schedule(Runnable runnable, Date timeToRun) {
- scheduler.schedule(runnable, delayMs(timeToRun), TimeUnit.MILLISECONDS);
+ public void schedule(String id, Runnable runnable, Date timeToRun) {
+ ScheduledFuture<?> future = scheduler.schedule(runnable,
+ delayMs(timeToRun),
+ TimeUnit.MILLISECONDS);
+ if(!allJobs.containsKey(id)) {
+ allJobs.put(id, new ScheduledRunnable(runnable, timeToRun));
+ }
+ scheduledJobResults.put(id, future);
}
- public void schedule(Runnable runnable, Date nextRun, long periodMs) {
- scheduler.scheduleAtFixedRate(runnable, delayMs(nextRun), periodMs, TimeUnit.MILLISECONDS);
+ public void schedule(String id, Runnable runnable, Date nextRun, long periodMs) {
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(runnable,
+ delayMs(nextRun),
+ periodMs,
+ TimeUnit.MILLISECONDS);
+ if(!allJobs.containsKey(id)) {
+ allJobs.put(id, new ScheduledRunnable(runnable, nextRun, periodMs));
+ }
+ scheduledJobResults.put(id, future);
}
private long delayMs(Date runDate) {
View
6 src/java/voldemort/server/storage/StorageService.java
@@ -201,7 +201,8 @@ protected void startInner() {
registerEngine(slopEngine);
storeRepository.setSlopStore(slopEngine);
logger.info("Slop store registered");
- scheduler.schedule(new SlopPusherJob(storeRepository,
+ scheduler.schedule("slop",
+ new SlopPusherJob(storeRepository,
metadata.getCluster(),
failureDetector,
voldemortConfig.getSlopMaxWriteBytesPerSec()),
@@ -467,7 +468,8 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
SystemTime.INSTANCE,
throttler);
- this.scheduler.schedule(cleanupJob,
+ this.scheduler.schedule("cleanup-" + storeDef.getName(),
+ cleanupJob,
startTime,
voldemortConfig.getRetentionCleanupScheduledPeriodInHour()
* Time.MS_PER_HOUR);
View
3  test/integration/voldemort/performance/benchmark/Workload.java
@@ -264,7 +264,8 @@ public void init(Props props) {
String keyType = props.getString(Benchmark.KEY_TYPE, Benchmark.STRING_KEY_TYPE);
String recordSelection = props.getString(Benchmark.RECORD_SELECTION,
Benchmark.UNIFORM_RECORD_SELECTION);
- boolean hasTransforms = props.getString(Benchmark.HAS_TRANSFORMS).compareTo("true") == 0;
+ boolean hasTransforms = props.getString(Benchmark.HAS_TRANSFORMS, "false")
+ .compareTo("true") == 0;
double readProportion = (double) readPercent / (double) 100;
double writeProportion = (double) writePercent / (double) 100;
Please sign in to comment.
Something went wrong with that request. Please try again.