Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.core.concurrency;

import org.jspecify.annotations.Nullable;

import org.springframework.util.ConcurrencyThrottleSupport;

/**
* Template-style API that throttles concurrent executions of user-provided callbacks
* according to a configurable concurrency limit.
*
* <p>Blocking semantics are identical to {@link ConcurrencyThrottleSupport}:
* when the configured limit is reached, additional callers will block until a
* permit becomes available.
*
* <p>The default concurrency limit of this template is 1.
*
* @author Geonhu Park
* @since 7.1
* @see ConcurrencyThrottleSupport
*/
@SuppressWarnings("serial")
public class ConcurrencyLimitTemplate extends ConcurrencyThrottleSupport {

/**
* Create a default {@code ConcurrencyLimitTemplate}
* with concurrency limit 1.
*/
public ConcurrencyLimitTemplate() {
this(1);
}

/**
* Create a {@code ConcurrencyThrottleInterceptor}
* with the given concurrency limit.
*/
public ConcurrencyLimitTemplate(int concurrencyLimit) {
setConcurrencyLimit(concurrencyLimit);
}

/**
* Execute the supplied callback under the configured concurrency limit.
* @param concurrencyLimited the unit of work to run
* @param <R> the result type (nullable)
* @return the callback's result (possibly {@code null})
* @throws Throwable any exception thrown by the callback
*/
public <R extends @Nullable Object> @Nullable R execute(ConcurrencyLimited<R> concurrencyLimited) throws Throwable {
beforeAccess();
try {
return concurrencyLimited.execute();
}
finally {
afterAccess();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.core.concurrency;

import org.jspecify.annotations.Nullable;

/**
* Functional callback representing a single unit of work to be executed under
* the concurrency throttling of a {@link ConcurrencyLimitTemplate}.
*
* @author Geonhu Park
* @since 7.1
* @param <R> the result type (nullable)
* @see ConcurrencyLimitTemplate
*/
@FunctionalInterface
public interface ConcurrencyLimited<R extends @Nullable Object> {

/**
* Execute the concurrency-limited operation.
* @return the result (may be {@code null})
* @throws Throwable any error from the underlying operation
*/
R execute() throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/**
* Concurrency limiting (throttling) support via {@link org.springframework.core.concurrency.ConcurrencyLimitTemplate}
* and {@link org.springframework.core.concurrency.ConcurrencyLimited}.
*/
package org.springframework.core.concurrency;
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* @see #beforeAccess()
* @see #afterAccess()
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
* @see org.springframework.core.concurrency.ConcurrencyLimitTemplate
* @see java.io.Serializable
*/
@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.core.concurrency;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Tests for {@link ConcurrencyLimitTemplate}.
*
* @author Geonhu Park
*/
class ConcurrencyLimitTemplateTests {

private static final Log logger = LogFactory.getLog(ConcurrencyLimitTemplateTests.class);

private static final int NR_OF_THREADS = 100;

private static final int NR_OF_ITERATIONS = 1000;

@ParameterizedTest
@ValueSource(ints = {1, 10})
void multipleThreadsWithLimit(int concurrencyLimit) {
ConcurrencyLimitTemplate template = new ConcurrencyLimitTemplate(concurrencyLimit);

Thread[] threads = new Thread[NR_OF_THREADS];
for (int i = 0; i < NR_OF_THREADS; i++) {
threads[i] = new ConcurrencyThread(template, null);
threads[i].start();
}
for (int i = 0; i < NR_OF_THREADS / 10; i++) {
try {
Thread.sleep(5);
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
threads[i] = new ConcurrencyThread(template,
(i % 2 == 0 ? new OutOfMemoryError() : new IllegalStateException()));
threads[i].start();
}
for (Thread t : threads) {
try {
t.join();
}
catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}

private static class ConcurrencyThread extends Thread {

private final ConcurrencyLimitTemplate template;
private final Throwable ex;

ConcurrencyThread(ConcurrencyLimitTemplate template, Throwable ex) {
this.template = template;
this.ex = ex;
}

@Override
public void run() {
if (this.ex != null) {
try {
this.template.execute(() -> {
throw this.ex;
});
}
catch (RuntimeException | Error err) {
if (err == this.ex) {
logger.info("Expected exception thrown", err);
}
else {
ex.printStackTrace();
}
}
catch (Throwable th) {
th.printStackTrace();
}
}
else {
for (int i = 0; i < NR_OF_ITERATIONS; i++) {
try {
this.template.execute(() -> null);
}
catch (Throwable th) {
th.printStackTrace();
break;
}
}
}
}
}
}