Skip to content

Commit

Permalink
feat: Add max-async configuration property (#196)
Browse files Browse the repository at this point in the history
* feat: Added max-async configuration property

* chore: adapt test code to be more readable and change configuration type

---------

Co-authored-by: Simon Krause <simon.krause@movisens.com>
  • Loading branch information
Nplu5 and Simon Krause committed Apr 18, 2023
1 parent c440721 commit 4c590c8
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 40 deletions.
79 changes: 42 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# Quarkus JBeret Extension

<!-- ALL-CONTRIBUTORS-BADGE:START - Do not remove or modify this section -->

[![Build](https://github.com/quarkiverse/quarkiverse-jberet/workflows/Build/badge.svg?branch=main)](https://github.com/quarkiverse/quarkiverse-jberet/actions?query=workflow%3ABuild)
[![License](https://img.shields.io/github/license/quarkiverse/quarkiverse-jberet.svg)](http://www.apache.org/licenses/LICENSE-2.0)
[![Central](https://img.shields.io/maven-central/v/io.quarkiverse.jberet/quarkus-jberet-parent?color=green)](https://search.maven.org/search?q=a:quarkus-jberet-parent)
[![All Contributors](https://img.shields.io/badge/all_contributors-1-green.svg)](#contributors-)

<!-- ALL-CONTRIBUTORS-BADGE:END -->

The Quarkus JBeret Extension adds support for
[JSR-352 Batch Applications for the Java Platform](https://jcp.org/en/jsr/detail?id=352).
The Quarkus JBeret Extension adds support for
[JSR-352 Batch Applications for the Java Platform](https://jcp.org/en/jsr/detail?id=352).
[JBeret](https://github.com/jberet) is an implementation of the JSR-352.

## Usage
Expand All @@ -24,29 +27,30 @@ To use the extension, add the dependency to the target project:

:information_source: **Recommended Quarkus version: `3.0.0.Alpha4` or higher**

The Batch API and Runtime will be available out of the box. Please refer to the
[Batch documentation](https://jcp.org/en/jsr/detail?id=352), or the
[JBeret documentation](https://jberet.gitbooks.io/jberet-user-guide/content/) to learn about Batch Applications.
The Batch API and Runtime will be available out of the box. Please refer to the
[Batch documentation](https://jcp.org/en/jsr/detail?id=352), or the
[JBeret documentation](https://jberet.gitbooks.io/jberet-user-guide/content/) to learn about Batch Applications.

## Configuration

The JBeret Quarkus extension supports the following configuration:

| Name | Type | Default |
|---|---|---|
| `quarkus.jberet.repository`<br>The repository type to store JBeret and Job data. A `jdbc` type requires a JDBC datasource. | `in-memory`, `jdbc` | `in-memory` |
| `quarkus.jberet.repository.jdbc.datasource`<br>The datasource name. | string | `<default>` |
| `quarkus.jberet.jobs.includes`<br>A list of patterns to match batch files to include. | list of string | |
| `quarkus.jberet.jobs.excludes`<br>A list of patterns to match batch files to exclude. | list of string | |
| `quarkus.jberet.job."job-name".cron`<br>A cron style expression in Quartz format to schedule the job. | string | |
| `quarkus.jberet.job."job-name".params."param-key"`<br>A parameter to start a job. | string | |

| Name | Type | Default |
| ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------- | -------------------------- |
| `quarkus.jberet.repository`<br>The repository type to store JBeret and Job data. A `jdbc` type requires a JDBC datasource. | `in-memory`, `jdbc` | `in-memory` |
| `quarkus.jberet.repository.jdbc.datasource`<br>The datasource name. | string | `<default>` |
| `quarkus.jberet.jobs.includes`<br>A list of patterns to match batch files to include. | list of string | |
| `quarkus.jberet.jobs.excludes`<br>A list of patterns to match batch files to exclude. | list of string | |
| `quarkus.jberet.job."job-name".cron`<br>A cron style expression in Quartz format to schedule the job. | string | |
| `quarkus.jberet.job."job-name".params."param-key"`<br>A parameter to start a job. | string | |
| `quarkus.jberet.max-async"`<br>A parameter control the number of Threads that can be used by JBeret. An additional Thread for JBeret coordination is always added. Thus setting 1 will proide one thread for job executions. | string | `Based on available cores` |

## Non-standard Features

### Simplified Configuration

The Batch API requires the `@BatchProperty` annotation to inject the specific configuration from the batch definition
file. Instead, you can use the `@ConfigProperty` annotation, which is used to inject configuration properties in
The Batch API requires the `@BatchProperty` annotation to inject the specific configuration from the batch definition
file. Instead, you can use the `@ConfigProperty` annotation, which is used to inject configuration properties in
Quarkus using the MicroProfile Config API and keep consistency:

```java
Expand All @@ -59,13 +63,13 @@ String batchConfig;
Optional<String> mpConfig;
```

Although, there is a slight limitation: since job configuration is mostly dynamic and only injected on job execution,
Quarkus may fail to start due to invalid configuration (can't find the Job configuration values). In this case,
configuration injection points with the `@ConfigProperty` annotation need to set a default value or use an `Optional`.
Although, there is a slight limitation: since job configuration is mostly dynamic and only injected on job execution,
Quarkus may fail to start due to invalid configuration (can't find the Job configuration values). In this case,
configuration injection points with the `@ConfigProperty` annotation need to set a default value or use an `Optional`.

### CDI Beans

The Batch APIs `JobOperator` and `JobRepository` are available as CDI beans, so they can be injected directly into any
The Batch APIs `JobOperator` and `JobRepository` are available as CDI beans, so they can be injected directly into any
code:

```java
Expand Down Expand Up @@ -94,18 +98,18 @@ void start() {
.batchlet("programmaticBatchlet")
.build())
.build();

long executionId = jobOperator.start(job, new Properties());
JobExecution jobExecution = jobOperator.getJobExecution(executionId);
}
```

With `QuarkusJobOperator` it is possible to define and start programmatic Jobs, with the
With `QuarkusJobOperator` it is possible to define and start programmatic Jobs, with the
[JBeret Programmatic Job Definition](https://jberet.gitbooks.io/jberet-user-guide/content/programmatic_job_definition_with_java/).

### Scheduler

The [JBeret Scheduler](https://github.com/jberet/jberet-schedule) is integrated out of the box in this extension.
The [JBeret Scheduler](https://github.com/jberet/jberet-schedule) is integrated out of the box in this extension.

To schedule a Job execution, please refer to the `quarkus.jberet.job."job-name".cron` and
`quarkus.jberet.job."job-name".params."param-key"` configurations.
Expand All @@ -129,11 +133,11 @@ public class Scheduler {
}
```

The `JobScheduler` does not support persistent schedules.
The `JobScheduler` does not support persistent schedules.

### REST API

The [JBeret REST](https://github.com/jberet/jberet-rest) is integrated as separate extension that can be easily added
The [JBeret REST](https://github.com/jberet/jberet-rest) is integrated as separate extension that can be easily added
to the target project with the following dependency:

```xml
Expand All @@ -144,8 +148,8 @@ to the target project with the following dependency:
</dependency>
```

The [JBeret REST](https://github.com/jberet/jberet-rest) API, provides REST resources to several operations around the
Batch API: starting and stopping jobs, querying the status of a job, schedule a job, and many more. The extension
The [JBeret REST](https://github.com/jberet/jberet-rest) API, provides REST resources to several operations around the
Batch API: starting and stopping jobs, querying the status of a job, schedule a job, and many more. The extension
includes a REST client to simplify the REST API calls:

```java
Expand All @@ -156,27 +160,27 @@ void start() throws Exception {
JobExecutionEntity jobExecutionEntity = batchClient.startJob("batchlet", new Properties());
}
```

## Example Applications

Example applications can be found inside the `integration-tests` folder:

* `chunk` - A simple Job that reads, processes, and stores data from a file.
* `jdbc-repository` - A Job that uses a `jdbc` datasource to store JBeret and Job metadata.
* `scheduler` - Schedule a Job to run every 10 seconds
- `chunk` - A simple Job that reads, processes, and stores data from a file.
- `jdbc-repository` - A Job that uses a `jdbc` datasource to store JBeret and Job metadata.
- `scheduler` - Schedule a Job to run every 10 seconds

Or take a look into the [World of Warcraft Auctions - Batch Application](https://github.com/radcortez/wow-auctions). It
downloads the World of Warcraft Auction House data and provides statistics about items prices.
Or take a look into the [World of Warcraft Auctions - Batch Application](https://github.com/radcortez/wow-auctions). It
downloads the World of Warcraft Auction House data and provides statistics about items prices.

## Native Image Limitations

The Quakus JBeret Extension fully supports the Graal VM Native Image with the following exceptions:

* [Scripting Languages](https://jberet.gitbooks.io/jberet-user-guide/content/develop_batch_artifacts_in_script_languages/).
While `Javascript` should work, it is unlikely that other scripting languages will be supported in
[Graal](https://github.com/oracle/graaljs/blob/master/docs/user/ScriptEngine.md) via JSR-223.
- [Scripting Languages](https://jberet.gitbooks.io/jberet-user-guide/content/develop_batch_artifacts_in_script_languages/).
While `Javascript` should work, it is unlikely that other scripting languages will be supported in
[Graal](https://github.com/oracle/graaljs/blob/master/docs/user/ScriptEngine.md) via JSR-223.

## Contributors ✨____
## Contributors ✨\_\_\_\_

Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)):

Expand All @@ -191,6 +195,7 @@ Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/d

<!-- markdownlint-enable -->
<!-- prettier-ignore-end -->

<!-- ALL-CONTRIBUTORS-LIST:END -->

This project follows the [all-contributors](https://github.com/all-contributors/all-contributors) specification. Contributions of any kind welcome!
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.quarkiverse.jberet.deployment;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.LocalTime;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import jakarta.batch.api.Batchlet;
import jakarta.batch.api.listener.AbstractJobListener;
import jakarta.batch.runtime.BatchStatus;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import jakarta.inject.Named;

import org.jberet.job.model.JobBuilder;
import org.jberet.job.model.StepBuilder;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkiverse.jberet.runtime.QuarkusJobOperator;
import io.quarkus.logging.Log;
import io.quarkus.test.QuarkusUnitTest;

public class MaxAsyncJobExecutorConfigTest {

@RegisterExtension
static QuarkusUnitTest TEST = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(ConfigParamsTest.ConfigBatchlet.class)
.addAsResource(new StringAsset("quarkus.jberet.max-async=1"),
"application.properties"));

@Inject
QuarkusJobOperator quarkusJobOperator;

@Test
void runsJobsConsecutivelyWhenThreadLimitIsOne() {
List<Long> jobIds = Stream.of("job 1", "job 2", "job 3")
.map(jobName -> new JobBuilder(jobName)
.listener("threadJobListener")
.step(new StepBuilder("dummyStep")
.batchlet("blocking")
.build())
.build())
.map(job -> quarkusJobOperator.start(job, new Properties()))
.collect(Collectors.toList());

await("All jobs finished").atMost(3, TimeUnit.SECONDS).until(() -> jobIds.stream()
.map(jobId -> quarkusJobOperator.getJobExecution(jobId).getBatchStatus())
.filter(BatchStatus.COMPLETED::equals)
.count() == 3);

assertTrue(ThreadCounter.maxParallelRunningJobCounter > 0);
assertTrue(ThreadCounter.maxParallelRunningJobCounter < 2);
}

@Named("blocking")
@Dependent
public static class BlockingBatchlet implements Batchlet {

@Override
public String process() throws InterruptedException {
Log.debug(LocalTime.now() + " Executing");
Thread.sleep(700);
Log.debug(LocalTime.now() + " Execution finished");
return BatchStatus.COMPLETED.toString();
}

@Override
public void stop() {
}
}

@Named
@Dependent
public static class ThreadJobListener extends AbstractJobListener {
@Override
public void beforeJob() {
ThreadCounter.incrementJobCounter();
}

@Override
public void afterJob() {
ThreadCounter.decrementJobCounter();
}
}

public static class ThreadCounter {
private static volatile int runningJobsCounter = 0;
private static volatile int maxParallelRunningJobCounter = 0;

public static synchronized void incrementJobCounter() {
runningJobsCounter++;
if (runningJobsCounter > maxParallelRunningJobCounter) {
maxParallelRunningJobCounter = runningJobsCounter;
}

}

public static synchronized void decrementJobCounter() {
runningJobsCounter--;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface JBeretConfig {
*/
Repository repository();

/**
*
*/
Optional<Integer> maxAsync();

interface JobConfig {
/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void initJobOperator(final JBeretConfig config, final ThreadPoolConfig th
JobRepository jobRepository = beanContainer.beanInstance(
JobRepository.class);

JobExecutor quarkusJobExecutor = new QuarkusJobExecutor(managedExecutor, threadPoolConfig);
JobExecutor quarkusJobExecutor = new QuarkusJobExecutor(managedExecutor, threadPoolConfig, config);

JBeretDataHolder.JBeretData data = JBeretDataHolder.getData();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
import org.jberet.spi.JobExecutor;
import org.wildfly.common.cpu.ProcessorInfo;

import io.quarkus.logging.Log;
import io.quarkus.runtime.ThreadPoolConfig;

class QuarkusJobExecutor extends JobExecutor {
private final int maxPoolSize;

public QuarkusJobExecutor(Executor delegate, final ThreadPoolConfig threadPoolConfig) {
public QuarkusJobExecutor(Executor delegate, final ThreadPoolConfig threadPoolConfig, final JBeretConfig config) {
super(delegate);
this.maxPoolSize = threadPoolConfig.maxThreads.orElse(Math.max(8 * ProcessorInfo.availableProcessors(), 200));

this.maxPoolSize = config.maxAsync().map(maxAsync -> {
if (maxAsync < 1) {
Log.error("max-async value must be 1 or greater if set.");
return null;
}
return maxAsync + 1; // Adapt for the fact that JBeret requires one thread for coordination.
}).orElse(threadPoolConfig.maxThreads.orElse(Math.max(8 * ProcessorInfo.availableProcessors(), 200)));
}

@Override
Expand Down

0 comments on commit 4c590c8

Please sign in to comment.