Skip to content

Aggregator - Orphaned Groups Don't Time Out After Restart #3334

@garyrussell

Description

@garyrussell

In what version(s) of Spring Integration are you seeing this issue?

All

Describe the bug

When using an external message group store, orphaned groups are not timed out.
https://stackoverflow.com/questions/62748832

Expected behavior

When there is a group timeout, and a persistent group store, the aggregator should schedule a timeout for all existing groups that were created by this aggregator. We don't currently have a mechanism to determine this; it could be a problem when the same store is used for different aggregators - we shouldn't schedule a timeout for groups we don't "own".

@SpringBootApplication
public class So62748832Application {

	public static void main(String[] args) throws InterruptedException {
		ConfigurableApplicationContext ctx = SpringApplication.run(So62748832Application.class, args);
		Thread.sleep(10000);
		ctx.close();
	}

	@Bean
	public IntegrationFlow flow(RedisConnectionFactory cf) {
		return f -> f.aggregate(agg -> agg.correlationExpression("'foo'")
				.messageStore(store(cf))
				.sendPartialResultOnExpiry(true)
				.groupTimeout(15000))
				.handle(System.out::println);
	}

	@Bean
	MessageGroupStore store(RedisConnectionFactory connectionFactory) {
		return new RedisMessageStore(connectionFactory);
	}

//	@Bean
	@DependsOn("flow")
	public ApplicationRunner runner(@Qualifier("flow.input") MessageChannel channel) {
		return args -> channel.send(new GenericMessage<>("foo"));
	}

}

Run it once with the runner bean uncommented. Run again with it commented - no timeout.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions