Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for Virtual Threads #906

Merged
merged 17 commits into from
Jan 2, 2024
13 changes: 3 additions & 10 deletions core/src/main/java/org/jobrunr/server/BackgroundJobServer.java
Expand Up @@ -84,7 +84,7 @@ public BackgroundJobServer(StorageProvider storageProvider, JsonMapper jsonMappe
this.backgroundJobRunners = initializeBackgroundJobRunners(jobActivator);
this.jobDefaultFilters = new JobDefaultFilters();
this.jobServerStats = new JobServerStats();
this.workDistributionStrategy = createWorkDistributionStrategy(configuration);
this.workDistributionStrategy = createWorkDistributionStrategy();
this.serverZooKeeper = createServerZooKeeper();
this.jobZooKeeper = createJobZooKeeper();
this.backgroundJobPerformerFactory = loadBackgroundJobPerformerFactory();
Expand Down Expand Up @@ -273,7 +273,7 @@ private void stopZooKeepers() {
}

private void startWorkers() {
jobExecutor = loadJobRunrExecutor();
jobExecutor = configuration.getBackgroundJobServerWorkerPolicy().toJobRunrExecutor();
jobExecutor.start();
}

Expand Down Expand Up @@ -327,7 +327,7 @@ private JobZooKeeper createJobZooKeeper() {
return new JobZooKeeper(this);
}

private WorkDistributionStrategy createWorkDistributionStrategy(BackgroundJobServerConfiguration configuration) {
private WorkDistributionStrategy createWorkDistributionStrategy() {
return configuration.getBackgroundJobServerWorkerPolicy().toWorkDistributionStrategy(this);
}

Expand All @@ -338,13 +338,6 @@ private BackgroundJobPerformerFactory loadBackgroundJobPerformerFactory() {
.orElseGet(BasicBackgroundJobPerformerFactory::new);
}

private JobRunrExecutor loadJobRunrExecutor() {
ServiceLoader<JobRunrExecutor> serviceLoader = ServiceLoader.load(JobRunrExecutor.class);
return stream(spliteratorUnknownSize(serviceLoader.iterator(), Spliterator.ORDERED), false)
.min((a, b) -> compare(b.getPriority(), a.getPriority()))
.orElseGet(() -> new ScheduledThreadPoolJobRunrExecutor(workDistributionStrategy.getWorkerCount(), "backgroundjob-worker-pool"));
}

private static class BackgroundJobServerLifecycleLock implements AutoCloseable {
private final ReentrantLock reentrantLock = new ReentrantLock();

Expand Down
@@ -1,6 +1,9 @@
package org.jobrunr.server;

import org.jobrunr.server.configuration.*;
import org.jobrunr.server.configuration.BackgroundJobServerWorkerPolicy;
import org.jobrunr.server.configuration.ConcurrentJobModificationPolicy;
import org.jobrunr.server.configuration.DefaultBackgroundJobServerWorkerPolicy;
import org.jobrunr.server.configuration.DefaultConcurrentJobModificationPolicy;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand Down Expand Up @@ -89,7 +92,7 @@ public BackgroundJobServerConfiguration andPollIntervalInSeconds(int pollInterva
* @return the same configuration instance which provides a fluent api
*/
public BackgroundJobServerConfiguration andWorkerCount(int workerCount) {
this.backgroundJobServerWorkerPolicy = new FixedSizeBackgroundJobServerWorkerPolicy(workerCount);
this.backgroundJobServerWorkerPolicy = new DefaultBackgroundJobServerWorkerPolicy(workerCount);
return this;
}

Expand Down
@@ -0,0 +1,51 @@
package org.jobrunr.server.configuration;

import org.jobrunr.server.threadpool.JobRunrExecutor;
import org.jobrunr.server.threadpool.ScheduledThreadPoolJobRunrExecutor;
import org.jobrunr.server.threadpool.VirtualThreadPoolJobRunrExecutor;

import java.util.function.Function;

import static org.jobrunr.utils.VersionNumber.isNewerOrEqualTo;

public enum BackgroundJobServerThreadType {
PlatformThreads("1.8") {
@Override
public Function<Integer, JobRunrExecutor> getJobRunrExecutor() {
return ScheduledThreadPoolJobRunrExecutor::new;
}

},
VirtualThreads("21") {
@Override
public Function<Integer, JobRunrExecutor> getJobRunrExecutor() {
return VirtualThreadPoolJobRunrExecutor::new;
}

@Override
public int getDefaultWorkerCount() {
return super.getDefaultWorkerCount() * 2;
}
};

private final String minimumJavaVersion;

BackgroundJobServerThreadType(String minimumJavaVersion) {
this.minimumJavaVersion = minimumJavaVersion;
}

abstract Function<Integer, JobRunrExecutor> getJobRunrExecutor();

public int getDefaultWorkerCount() {
// see https://jobs.zalando.com/en/tech/blog/how-to-set-an-ideal-thread-pool-size
return (Runtime.getRuntime().availableProcessors() * 8);
}

public String getMinimumJavaVersion() {
return minimumJavaVersion;
}

public static BackgroundJobServerThreadType getDefaultThreadType() {
return isNewerOrEqualTo(System.getProperty("java.version"), "21") ? BackgroundJobServerThreadType.VirtualThreads : BackgroundJobServerThreadType.PlatformThreads;
}
}
Expand Up @@ -2,8 +2,11 @@

import org.jobrunr.server.BackgroundJobServer;
import org.jobrunr.server.strategy.WorkDistributionStrategy;
import org.jobrunr.server.threadpool.JobRunrExecutor;

public interface BackgroundJobServerWorkerPolicy {

WorkDistributionStrategy toWorkDistributionStrategy(BackgroundJobServer backgroundJobServer);

JobRunrExecutor toJobRunrExecutor();
}
Expand Up @@ -3,18 +3,48 @@
import org.jobrunr.server.BackgroundJobServer;
import org.jobrunr.server.strategy.BasicWorkDistributionStrategy;
import org.jobrunr.server.strategy.WorkDistributionStrategy;
import org.jobrunr.server.threadpool.JobRunrExecutor;

import java.util.function.Function;

import static org.jobrunr.utils.VersionNumber.isOlderThan;

public class DefaultBackgroundJobServerWorkerPolicy implements BackgroundJobServerWorkerPolicy {

private final int workerCount;
private final Function<Integer, JobRunrExecutor> jobRunrExecutorFunction;

public DefaultBackgroundJobServerWorkerPolicy() {
// see https://jobs.zalando.com/en/tech/blog/how-to-set-an-ideal-thread-pool-size
workerCount = (Runtime.getRuntime().availableProcessors() * 8);
this(BackgroundJobServerThreadType.getDefaultThreadType());
}

public DefaultBackgroundJobServerWorkerPolicy(BackgroundJobServerThreadType threadType) {
this(threadType.getDefaultWorkerCount(), threadType);
}

public DefaultBackgroundJobServerWorkerPolicy(int workerCount) {
this(workerCount, BackgroundJobServerThreadType.getDefaultThreadType());
}

public DefaultBackgroundJobServerWorkerPolicy(int workerCount, BackgroundJobServerThreadType threadType) {
this(workerCount, threadType.getJobRunrExecutor());
if (isOlderThan(System.getProperty("java.version"), threadType.getMinimumJavaVersion())) {
throw new UnsupportedOperationException("The required minimum Java version to use " + threadType + " is " + threadType.getMinimumJavaVersion());
}
}

public DefaultBackgroundJobServerWorkerPolicy(int workerCount, Function<Integer, JobRunrExecutor> jobRunrExecutorFunction) {
this.workerCount = workerCount;
this.jobRunrExecutorFunction = jobRunrExecutorFunction;
}

@Override
public WorkDistributionStrategy toWorkDistributionStrategy(BackgroundJobServer backgroundJobServer) {
return new BasicWorkDistributionStrategy(backgroundJobServer, workerCount);
}

@Override
public JobRunrExecutor toJobRunrExecutor() {
return jobRunrExecutorFunction.apply(workerCount);
}
}
Expand Up @@ -3,6 +3,8 @@
import org.jobrunr.server.BackgroundJobServer;
import org.jobrunr.server.strategy.BasicWorkDistributionStrategy;
import org.jobrunr.server.strategy.WorkDistributionStrategy;
import org.jobrunr.server.threadpool.JobRunrExecutor;
import org.jobrunr.server.threadpool.ScheduledThreadPoolJobRunrExecutor;

public class FixedSizeBackgroundJobServerWorkerPolicy implements BackgroundJobServerWorkerPolicy {

Expand All @@ -16,4 +18,9 @@ public FixedSizeBackgroundJobServerWorkerPolicy(int workerCount) {
public WorkDistributionStrategy toWorkDistributionStrategy(BackgroundJobServer backgroundJobServer) {
return new BasicWorkDistributionStrategy(backgroundJobServer, workerCount);
}

@Override
public JobRunrExecutor toJobRunrExecutor() {
return new ScheduledThreadPoolJobRunrExecutor(workerCount, "backgroundjob-zookeeper-pool");
}
}
Expand Up @@ -3,11 +3,9 @@
import java.util.concurrent.Executor;

public interface JobRunrExecutor extends Executor {

int getPriority();
int getWorkerCount();

void start();

void stop();

}
Expand Up @@ -10,16 +10,22 @@
public class ScheduledThreadPoolJobRunrExecutor extends java.util.concurrent.ScheduledThreadPoolExecutor implements JobRunrExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledThreadPoolJobRunrExecutor.class);
private final int workerCount;

public ScheduledThreadPoolJobRunrExecutor(int corePoolSize) {
this(corePoolSize, "backgroundjob-zookeeper-pool");
}

public ScheduledThreadPoolJobRunrExecutor(int corePoolSize, String threadNamePrefix) {
super(corePoolSize, new NamedThreadFactory(threadNamePrefix));
this.workerCount = corePoolSize;
setMaximumPoolSize(corePoolSize * 2);
setKeepAliveTime(1, TimeUnit.MINUTES);
}

@Override
public int getPriority() {
return 10;
public int getWorkerCount() {
return workerCount;
}

@Override
Expand Down
@@ -0,0 +1,60 @@
package org.jobrunr.server.threadpool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.jobrunr.utils.reflection.ReflectionUtils.getMethod;

public class VirtualThreadPoolJobRunrExecutor implements JobRunrExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(VirtualThreadPoolJobRunrExecutor.class);

private final ExecutorService executorService;
private final int workerCount;
private boolean started;

public VirtualThreadPoolJobRunrExecutor(int workerCount) {
this(workerCount, createVirtualThreadExecutorService());
}

public VirtualThreadPoolJobRunrExecutor(int workerCount, ExecutorService executorService) {
this.workerCount = workerCount;
this.executorService = executorService;
}

@Override
public int getWorkerCount() {
return workerCount;
}

@Override
public void start() {
this.started = true;
LOGGER.info("ThreadManager of type 'VirtualThreadPerTask' started");
}

@Override
public void stop() {
this.started = false;
}

@Override
public void execute(Runnable command) {
if (started) {
executorService.submit(command);
}
}

static ExecutorService createVirtualThreadExecutorService() {
try {
Method newVirtualThreadPerTaskExecutor = getMethod(Executors.class, "newVirtualThreadPerTaskExecutor");
return (ExecutorService) newVirtualThreadPerTaskExecutor.invoke(null);
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("Expected Executors.newVirtualThreadPerTaskExecutor to be present on Java " + System.getProperty("java.version"));
}
}
}
48 changes: 30 additions & 18 deletions core/src/main/java/org/jobrunr/utils/VersionNumber.java
Expand Up @@ -2,6 +2,7 @@

import java.util.Objects;

import static java.util.Optional.ofNullable;
import static org.jobrunr.utils.StringUtils.isNotNullOrEmpty;
import static org.jobrunr.utils.StringUtils.isNullOrEmpty;
import static org.jobrunr.utils.StringUtils.substringAfter;
Expand All @@ -11,19 +12,21 @@ public class VersionNumber implements Comparable<VersionNumber> {

private final String completeVersion;
private final String version;
private final int majorVersion;
private final int minorVersion;
private final int patchVersion;
private final String majorVersion;
private final String minorVersion;
private final String patchVersion;
private final String updateVersion;
private final String qualifier;

public VersionNumber(String completeVersion) {
this.completeVersion = completeVersion;
this.version = substringBefore(completeVersion, "-");
this.qualifier = substringAfter(completeVersion, "-");
String[] split = this.version.split("\\.");
this.majorVersion = split.length > 0 ? Integer.parseInt(split[0]) : 0;
this.minorVersion = split.length > 1 ? Integer.parseInt(split[1]) : 0;
this.patchVersion = split.length > 2 ? Integer.parseInt(split[2]) : 0;
this.majorVersion = split.length > 0 ? split[0] : "0";
this.minorVersion = split.length > 1 ? split[1] : "0";
this.patchVersion = split.length > 2 ? substringBefore(split[2], "_") : "0";
this.updateVersion = split.length > 2 ? ofNullable(substringAfter(split[2], "_")).orElse("0") : "0";
}

public String getCompleteVersion() {
Expand Down Expand Up @@ -64,14 +67,17 @@ public int hashCode() {

@Override
public int compareTo(VersionNumber o) {
if (majorVersion < o.majorVersion) return -1;
else if (majorVersion > o.majorVersion) return 1;
int majorVersionComparison = compareVersionNumber(majorVersion, o.majorVersion);
if (majorVersionComparison != 0) return majorVersionComparison;

if (minorVersion < o.minorVersion) return -1;
else if (minorVersion > o.minorVersion) return 1;
int minorVersionComparison = compareVersionNumber(minorVersion, o.minorVersion);
if (minorVersionComparison != 0) return minorVersionComparison;

if (patchVersion < o.patchVersion) return -1;
else if (patchVersion > o.patchVersion) return 1;
int patchVersionComparison = compareVersionNumber(patchVersion, o.patchVersion);
if (patchVersionComparison != 0) return patchVersionComparison;

int updateVersionComparison = compareVersionNumber(updateVersion, o.updateVersion);
if (updateVersionComparison != 0) return updateVersionComparison;

if (isNullOrEmpty(qualifier) && isNullOrEmpty(o.qualifier)) {
return 0;
Expand All @@ -89,20 +95,26 @@ public String toString() {
return completeVersion;
}

private String toFullyQualifiedVersion(String version) {
String newString = "0000000000".concat(version);
return newString.substring(newString.length() - 10);
private int compareVersionNumber(String myself, String other) {
if (myself.length() != other.length()) return myself.length() - other.length();
else if (myself.compareTo(other) < 0) return -1;
else if (myself.compareTo(other) > 0) return 1;
return 0;
}

public static VersionNumber of(String version) {
return new VersionNumber(version);
}

public static boolean isOlderOrEqualTo(String baseLine, String actualVersion) {
return of(baseLine).isOlderOrEqualTo(of(actualVersion));
public static boolean isOlderThan(String actualVersion, String baseLine) {
return of(actualVersion).isOlderThan(of(baseLine));
}

public static boolean isOlderOrEqualTo(String actualVersion, String baseLine) {
return of(actualVersion).isOlderOrEqualTo(of(baseLine));
}

public static boolean isNewerOrEqualTo(String actualVersion, String baseLine) {
return of(actualVersion).isNewerOrEqualTo(of(baseLine));
}
}
}