Skip to content

Commit

Permalink
Rework infra logic onto IntegrationProperties (#3881)
Browse files Browse the repository at this point in the history
* Rework infra logic onto IntegrationProperties

The `Properties` bean has been deprecated since `5.5`

* Register a default `integrationGlobalProperties` as a `IntegrationProperties`
* Fix respective usages from the `Properties`

* * Remove unused constant from the `IntegrationContextUtils`

* * Fix language in the `whats-new.adoc`

Co-authored-by: Gary Russell <grussell@vmware.com>

Co-authored-by: Gary Russell <grussell@vmware.com>
  • Loading branch information
artembilan and garyrussell committed Sep 6, 2022
1 parent 8d241c6 commit 15e89c3
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 158 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,7 +32,6 @@
import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.AbstractDispatcher;
import org.springframework.integration.dispatcher.MessageDispatcher;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
Expand Down Expand Up @@ -166,10 +165,10 @@ public void onInit() {
super.onInit();
this.dispatcher = this.createDispatcher();
if (this.maxSubscribers == null) {
this.maxSubscribers = this.getIntegrationProperty(this.isPubSub ?
IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS :
IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS,
Integer.class);
this.maxSubscribers =
this.isPubSub
? getIntegrationProperties().getChannelsMaxBroadcastSubscribers()
: getIntegrationProperties().getChannelsMaxUnicastSubscribers();
}
setMaxSubscribers(this.maxSubscribers);
String queue = obtainQueueName(this.channelName);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.integration.channel;

import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
Expand Down Expand Up @@ -86,10 +85,7 @@ protected UnicastingDispatcher getDispatcher() {
protected void onInit() {
super.onInit();
if (this.maxSubscribers == null) {
Integer max = getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS, Integer.class);
if (max != null) {
setMaxSubscribers(max);
}
setMaxSubscribers(getIntegrationProperties().getChannelsMaxUnicastSubscribers());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,11 @@

import java.util.concurrent.Executor;

import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

Expand All @@ -42,13 +42,14 @@
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 1.0.3
*/
public class ExecutorChannel extends AbstractExecutorChannel {

private volatile boolean failover = true;
private final LoadBalancingStrategy loadBalancingStrategy;

private volatile LoadBalancingStrategy loadBalancingStrategy;
private boolean failover = true;

/**
* Create an ExecutorChannel that delegates to the provided
Expand All @@ -69,13 +70,13 @@ public ExecutorChannel(Executor executor) {
* @param executor The executor.
* @param loadBalancingStrategy The load balancing strategy implementation.
*/
public ExecutorChannel(Executor executor, LoadBalancingStrategy loadBalancingStrategy) {
public ExecutorChannel(Executor executor, @Nullable LoadBalancingStrategy loadBalancingStrategy) {
super(executor);
Assert.notNull(executor, "executor must not be null");
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(executor);
if (loadBalancingStrategy != null) {
this.loadBalancingStrategy = loadBalancingStrategy;
unicastingDispatcher.setLoadBalancingStrategy(loadBalancingStrategy);
this.loadBalancingStrategy = loadBalancingStrategy;
if (this.loadBalancingStrategy != null) {
unicastingDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
}
this.dispatcher = unicastingDispatcher;
}
Expand Down Expand Up @@ -108,8 +109,7 @@ public final void onInit() {
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);
unicastingDispatcher.setFailover(this.failover);
if (this.maxSubscribers == null) {
this.maxSubscribers =
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS, Integer.class);
this.maxSubscribers = getIntegrationProperties().getChannelsMaxUnicastSubscribers();
}
unicastingDispatcher.setMaxSubscribers(this.maxSubscribers);
if (this.loadBalancingStrategy != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.lang.Nullable;
Expand All @@ -38,10 +37,10 @@
*/
public class PublishSubscribeChannel extends AbstractExecutorChannel implements BroadcastCapableChannel {

private ErrorHandler errorHandler;

private final boolean requireSubscribers;

private ErrorHandler errorHandler;

private boolean ignoreFailures;

private boolean applySequence;
Expand Down Expand Up @@ -122,7 +121,7 @@ public void setErrorHandler(ErrorHandler errorHandler) {

/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is false meaning that an Exception
* ignored. By default, this is false meaning that an Exception
* will be thrown whenever a handler fails. To override this and suppress
* Exceptions, set the value to true.
* @param ignoreFailures true if failures should be ignored.
Expand Down Expand Up @@ -188,9 +187,7 @@ else if (this.errorHandler != null) {
}

if (this.maxSubscribers == null) {
Integer maxSubscribers =
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
setMaxSubscribers(maxSubscribers);
setMaxSubscribers(getIntegrationProperties().getChannelsMaxBroadcastSubscribers());
}
dispatcherToUse.setBeanFactory(beanFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
@Override
public void afterSingletonsInstantiated() {
if (LOGGER.isDebugEnabled()) {
Properties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
Properties integrationProperties =
IntegrationContextUtils.getIntegrationProperties(this.beanFactory)
.toProperties();

StringWriter writer = new StringWriter();
integrationProperties.list(new PrintWriter(writer));
Expand Down Expand Up @@ -237,11 +239,8 @@ private void registerErrorChannel() {
}

private PublishSubscribeChannel createErrorChannel() {
Properties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
String requireSubscribers =
integrationProperties.getProperty(IntegrationProperties.ERROR_CHANNEL_REQUIRE_SUBSCRIBERS);

return new PublishSubscribeChannel(Boolean.parseBoolean(requireSubscribers));
IntegrationProperties integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
return new PublishSubscribeChannel(integrationProperties.isErrorChannelRequireSubscribers());
}

/**
Expand Down Expand Up @@ -318,16 +317,19 @@ private void registerTaskScheduler() {
*/
private void registerIntegrationProperties() {
if (!this.beanFactory.containsBean(IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME)) {
// TODO Revise in favor of 'IntegrationProperties' instance in the next 6.0 version
BeanDefinitionBuilder integrationPropertiesBuilder =
BeanDefinitionBuilder.genericBeanDefinition(PropertiesFactoryBean.class,
PropertiesFactoryBean::new)
.setRole(BeanDefinition.ROLE_INFRASTRUCTURE)
.addPropertyValue("properties", IntegrationProperties.defaults())
.addPropertyValue("locations", "classpath*:META-INF/spring.integration.properties");
BeanDefinition userProperties =
BeanDefinitionBuilder.genericBeanDefinition(PropertiesFactoryBean.class, PropertiesFactoryBean::new)
.addPropertyValue("locations", "classpath*:META-INF/spring.integration.properties")
.getBeanDefinition();

BeanDefinition integrationProperties =
BeanDefinitionBuilder.genericBeanDefinition(IntegrationProperties.class)
.setFactoryMethod("parse")
.addConstructorArgValue(userProperties)
.getBeanDefinition();

this.registry.registerBeanDefinition(IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME,
integrationPropertiesBuilder.getBeanDefinition());
integrationProperties);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@

package org.springframework.integration.context;

import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jetbrains.annotations.Nullable;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanNotOfRequiredTypeException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.metadata.MetadataStore;
Expand Down Expand Up @@ -70,8 +64,6 @@ public abstract class IntegrationContextUtils {

public static final String INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME = "integrationGlobalProperties";

public static final String MERGED_INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME = "mergedIntegrationGlobalProperties";

public static final String CHANNEL_INITIALIZER_BEAN_NAME = "channelInitializer";

public static final String AUTO_CREATE_CHANNEL_CANDIDATES_BEAN_NAME = "$autoCreateChannelCandidates";
Expand Down Expand Up @@ -179,8 +171,6 @@ private static <T> T getBeanOfType(BeanFactory beanFactory, String beanName, Cla
return beanFactory.getBean(beanName, type);
}

// TODO Revise in favor of 'IntegrationProperties' instance in the next 6.0 version

/**
* @param beanFactory The bean factory.
* @return the global {@link IntegrationContextUtils#INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME}
Expand All @@ -191,59 +181,16 @@ private static <T> T getBeanOfType(BeanFactory beanFactory, String beanName, Cla
* {@link IntegrationContextUtils#INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME} bean in the
* provided {@code #beanFactory} or provided {@code #beanFactory} is null.
*/
public static Properties getIntegrationProperties(BeanFactory beanFactory) {
Properties properties;
public static IntegrationProperties getIntegrationProperties(BeanFactory beanFactory) {
IntegrationProperties integrationProperties = null;
if (beanFactory != null) {
properties = getBeanOfType(beanFactory, MERGED_INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME, Properties.class);
if (properties == null) {
Properties propertiesToRegister = new Properties();
propertiesToRegister.putAll(IntegrationProperties.defaults());
Properties userProperties = obtainUserProperties(beanFactory);
if (userProperties != null) {
propertiesToRegister.putAll(userProperties);
}

if (beanFactory instanceof BeanDefinitionRegistry registry) {
RootBeanDefinition beanDefinition = new RootBeanDefinition(Properties.class);
beanDefinition.setInstanceSupplier(() -> propertiesToRegister);

registry.registerBeanDefinition(MERGED_INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME, beanDefinition);
}

properties = propertiesToRegister;
}
}
else {
properties = new Properties();
properties.putAll(IntegrationProperties.defaults());
}
return properties;
}

@Nullable
private static Properties obtainUserProperties(BeanFactory beanFactory) {
Object userProperties = getBeanOfType(beanFactory, INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME, Object.class);
if (userProperties instanceof Properties) {
if (BeanDefinition.ROLE_INFRASTRUCTURE !=
((BeanDefinitionRegistry) beanFactory)
.getBeanDefinition(INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME)
.getRole()) {

LOGGER.warn("The 'integrationGlobalProperties' bean must be declared as an instance of " +
"'org.springframework.integration.context.IntegrationProperties'. " +
"A 'java.util.Properties' support as a bean is deprecated for " +
"'integrationGlobalProperties' since version 5.5.");
}
return (Properties) userProperties;
}
else if (userProperties instanceof IntegrationProperties) {
return ((IntegrationProperties) userProperties).toProperties();
integrationProperties =
getBeanOfType(beanFactory, INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME, IntegrationProperties.class);
}
else if (userProperties != null) {
throw new BeanNotOfRequiredTypeException(INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME,
IntegrationProperties.class, userProperties.getClass());
if (integrationProperties == null) {
integrationProperties = IntegrationProperties.DEFAULT_INSTANCE;
}
return null;
return integrationProperties;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.integration.context;

import java.util.Properties;
import java.util.UUID;

import org.springframework.aop.framework.AopProxyUtils;
Expand Down Expand Up @@ -86,7 +85,7 @@ public abstract class IntegrationObjectSupport implements BeanNameAware, NamedCo

private TaskScheduler taskScheduler;

private Properties integrationProperties = IntegrationProperties.defaults();
private IntegrationProperties integrationProperties = new IntegrationProperties();

private ConversionService conversionService;

Expand Down Expand Up @@ -290,12 +289,9 @@ protected ApplicationContext getApplicationContext() {

/**
* @return The global integration properties.
* @deprecated since version 5.5 in favor of {@link #getIntegrationProperty(String, Class)};
* will be replaced with {@link IntegrationProperties} variant in the next major version.
* @see IntegrationContextUtils#getIntegrationProperties(BeanFactory)
*/
@Deprecated
protected Properties getIntegrationProperties() {
protected IntegrationProperties getIntegrationProperties() {
return this.integrationProperties;
}

Expand All @@ -315,9 +311,11 @@ public void setMessageBuilderFactory(MessageBuilderFactory messageBuilderFactory
* @param tClass the class to convert a value of Integration property.
* @param <T> The expected type of the property.
* @return the value of the Integration property converted to the provide type.
* @deprecated in favor of {@link #getIntegrationProperties()}
*/
@Deprecated(since = "6.0")
protected <T> T getIntegrationProperty(String key, Class<T> tClass) {
return this.defaultConversionService.convert(this.integrationProperties.getProperty(key), tClass);
return this.defaultConversionService.convert(this.integrationProperties.toProperties().getProperty(key), tClass);
}

@Override
Expand Down

0 comments on commit 15e89c3

Please sign in to comment.