Skip to content

Commit

Permalink
Separated job executor service from the job scheduler service.
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Sep 27, 2015
1 parent 38186c1 commit edf398b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
Expand Up @@ -169,7 +169,7 @@ public void onFailure(Throwable t) {
Callbacks.done(callback, null, t); Callbacks.done(callback, null, t);
} }


}, Jobs.executor()); }, Jobs.scheduler());
} }


private static List<Map<String, Object>> results(List<Row> rows) { private static List<Map<String, Object>> results(List<Row> rows) {
Expand Down
35 changes: 24 additions & 11 deletions rapidoid-http/src/main/java/org/rapidoid/job/Jobs.java
Expand Up @@ -21,8 +21,10 @@
*/ */


import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import org.rapidoid.activity.RapidoidThreadFactory; import org.rapidoid.activity.RapidoidThreadFactory;
Expand All @@ -40,29 +42,40 @@
@Since("4.1.0") @Since("4.1.0")
public class Jobs implements Constants { public class Jobs implements Constants {


private static ScheduledThreadPoolExecutor EXECUTOR; private static ScheduledExecutorService SCHEDULER;

private static Executor EXECUTOR;


private Jobs() {} private Jobs() {}


public static synchronized ScheduledThreadPoolExecutor executor() { public static synchronized ScheduledExecutorService scheduler() {
if (SCHEDULER == null) {
int threads = Conf.option("threads", 100);
SCHEDULER = Executors.newScheduledThreadPool(threads / 2, new RapidoidThreadFactory("jobs"));
}

return SCHEDULER;
}

public static synchronized Executor executor() {
if (EXECUTOR == null) { if (EXECUTOR == null) {
int threads = Conf.option("threads", 100); int threads = Conf.option("threads", 100);
EXECUTOR = new ScheduledThreadPoolExecutor(threads, new RapidoidThreadFactory("jobs")); EXECUTOR = Executors.newFixedThreadPool(threads);
} }


return EXECUTOR; return EXECUTOR;
} }


public static ScheduledFuture<?> schedule(Runnable job, long delay, TimeUnit unit) { public static ScheduledFuture<?> schedule(Runnable job, long delay, TimeUnit unit) {
return executor().schedule(wrap(job), delay, unit); return scheduler().schedule(wrap(job), delay, unit);
} }


public static <T> ScheduledFuture<?> schedule(Callable<T> job, long delay, TimeUnit unit, Callback<T> callback) { public static <T> ScheduledFuture<?> schedule(Callable<T> job, long delay, TimeUnit unit, Callback<T> callback) {
return schedule(callbackJob(job, callback), delay, unit); return schedule(callbackJob(job, callback), delay, unit);
} }


public static ScheduledFuture<?> scheduleAtFixedRate(Runnable job, long initialDelay, long period, TimeUnit unit) { public static ScheduledFuture<?> scheduleAtFixedRate(Runnable job, long initialDelay, long period, TimeUnit unit) {
return executor().scheduleAtFixedRate(wrap(job), initialDelay, period, unit); return scheduler().scheduleAtFixedRate(wrap(job), initialDelay, period, unit);
} }


public static <T> ScheduledFuture<?> scheduleAtFixedRate(Callable<T> job, long initialDelay, long period, public static <T> ScheduledFuture<?> scheduleAtFixedRate(Callable<T> job, long initialDelay, long period,
Expand All @@ -71,20 +84,20 @@ public static <T> ScheduledFuture<?> scheduleAtFixedRate(Callable<T> job, long i
} }


public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable job, long initialDelay, long delay, TimeUnit unit) { public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable job, long initialDelay, long delay, TimeUnit unit) {
return executor().scheduleWithFixedDelay(wrap(job), initialDelay, delay, unit); return scheduler().scheduleWithFixedDelay(wrap(job), initialDelay, delay, unit);
} }


public static <T> ScheduledFuture<?> scheduleWithFixedDelay(Callable<T> job, long initialDelay, long delay, public static <T> ScheduledFuture<?> scheduleWithFixedDelay(Callable<T> job, long initialDelay, long delay,
TimeUnit unit, Callback<T> callback) { TimeUnit unit, Callback<T> callback) {
return scheduleWithFixedDelay(callbackJob(job, callback), initialDelay, delay, unit); return scheduleWithFixedDelay(callbackJob(job, callback), initialDelay, delay, unit);
} }


public static ScheduledFuture<?> execute(Runnable job) { public static void execute(Runnable job) {
return schedule(job, 0, TimeUnit.MILLISECONDS); executor().execute(wrap(job));
} }


public static <T> ScheduledFuture<?> execute(Callable<T> job, Callback<T> callback) { public static <T> void execute(Callable<T> job, Callback<T> callback) {
return execute(callbackJob(job, callback)); execute(callbackJob(job, callback));
} }


public static Runnable wrap(Runnable job) { public static Runnable wrap(Runnable job) {
Expand Down

0 comments on commit edf398b

Please sign in to comment.