Skip to content

Commit

Permalink
GH-878 Fix concurrency issue during registration and lookup of functions
Browse files Browse the repository at this point in the history
Resolves #878
  • Loading branch information
olegz committed May 30, 2022
1 parent 29b4fa8 commit 9b6952f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,50 +112,52 @@ public <T> T lookup(Class<?> type, String functionDefinition, String... expected
return null;
}
FunctionInvocationWrapper function = this.doLookup(type, functionDefinition, expectedOutputMimeTypes);

if (function == null) {
Set<String> functionRegistratioinNames = super.getNames(null);
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
for (String functionName : functionNames) {
if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) {
logger.debug("Skipping function '" + functionName + "' since it is already present");
}
else {
Object functionCandidate = this.discoverFunctionInBeanFactory(functionName);
if (functionCandidate != null) {
Type functionType = null;
FunctionRegistration functionRegistration = null;
if (functionCandidate instanceof FunctionRegistration) {
functionRegistration = (FunctionRegistration) functionCandidate;
}
else if (this.isFunctionPojo(functionCandidate, functionName)) {
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
}
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
functionRegistration = this.applicationContext
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
}
else {
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
}
if (functionRegistration == null) {
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
}
// Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin).
if (functionRegistration != null) {
this.register(functionRegistration);
}
Object syncInstance = functionDefinition == null ? this : functionDefinition;
synchronized (syncInstance) {
if (function == null) {
Set<String> functionRegistratioinNames = super.getNames(null);
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
for (String functionName : functionNames) {
if (functionRegistratioinNames.contains(functionName) && logger.isDebugEnabled()) {
logger.debug("Skipping function '" + functionName + "' since it is already present");
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
Object functionCandidate = this.discoverFunctionInBeanFactory(functionName);
if (functionCandidate != null) {
Type functionType = null;
FunctionRegistration functionRegistration = null;
if (functionCandidate instanceof FunctionRegistration) {
functionRegistration = (FunctionRegistration) functionCandidate;
}
else if (this.isFunctionPojo(functionCandidate, functionName)) {
Method functionalMethod = FunctionTypeUtils.discoverFunctionalMethod(functionCandidate.getClass());
functionCandidate = this.proxyTarget(functionCandidate, functionalMethod);
functionType = FunctionTypeUtils.fromFunctionMethod(functionalMethod);
}
else if (this.isSpecialFunctionRegistration(functionNames, functionName)) {
functionRegistration = this.applicationContext
.getBean(functionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, FunctionRegistration.class);
}
else {
functionType = FunctionTypeUtils.discoverFunctionType(functionCandidate, functionName, this.applicationContext);
}
if (functionRegistration == null) {
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
}
// Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin).
if (functionRegistration != null) {
this.register(functionRegistration);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Function '" + functionName + "' is not available in FunctionCatalog or BeanFactory");
}
}
}
}
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
}
function = super.doLookup(type, functionDefinition, expectedOutputMimeTypes);
}

return (T) function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ private FunctionInvocationWrapper findFunctionInFunctionRegistrations(String fun
/*
*
*/
private synchronized FunctionInvocationWrapper compose(Class<?> type, String functionDefinition) {
private FunctionInvocationWrapper compose(Class<?> type, String functionDefinition) {
String[] functionNames = StringUtils.delimitedListToStringArray(functionDefinition.replaceAll(",", "|").trim(), "|");
FunctionInvocationWrapper composedFunction = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
Expand Down Expand Up @@ -102,6 +103,27 @@ public void before() {
System.clearProperty("spring.cloud.function.definition");
}


@SuppressWarnings({ "rawtypes" })
@Test
public void concurrencyLookupTest() throws Exception {
FunctionCatalog catalog = this.configureCatalog();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
executor.execute(() -> {
catalog.lookup("uppercase", "application/json");
});
executor.execute(() -> {
catalog.lookup("numberword", "application/json");
});
}
Thread.sleep(1000);
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
frField.setAccessible(true);
Collection c = (Collection) frField.get(catalog);
assertThat(c.size()).isEqualTo(2);
}

@SuppressWarnings("unchecked")
@Test
public void testDefaultLookup() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

package org.springframework.cloud.function.context.catalog;

import java.lang.reflect.Field;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -67,6 +71,7 @@
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -92,6 +97,27 @@ public void before() {
this.conversionService = new DefaultConversionService();
}

@SuppressWarnings("rawtypes")
@Test
public void concurrencyRegistrationTest() throws Exception {
Echo function = new Echo();
FunctionRegistration<Echo> registration = new FunctionRegistration<>(
function, "echo").type(FunctionType.of(Echo.class));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
new JacksonMapper(new ObjectMapper()));
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executor.execute(() -> {
catalog.register(registration);
});
}
Thread.sleep(1000);
Field frField = ReflectionUtils.findField(catalog.getClass(), "functionRegistrations");
frField.setAccessible(true);
Collection c = (Collection) frField.get(catalog);
assertThat(c.size()).isEqualTo(1);
}

@Test
public void testCachingOfFunction() {
Echo function = new Echo();
Expand Down

0 comments on commit 9b6952f

Please sign in to comment.