Skip to content

Spring reactor adaptative concurrency poller for SQS consumer workloads

License

Notifications You must be signed in to change notification settings

juancarrey/reactor-poller

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

13 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Reactor Poller

A reactive SQS Poller (Any other polling source can be implemented) that adapts the concurrency to the response:

  • When there is messages, concurrency can grow in different strategies:
    • fastest: When there is messages grow to MAX, when there is no messages for a concurrent 'worker' scale down by 1.
    • linear: When there is messages grow by +N, when there is no messages for a concurrent 'worker' scale down by 1.
    • exponential: When there is messages grow by *N, when there is no messages for a concurrent 'worker' scale down by 1.

Installation

This library is published to maven central, you can use either raw reactor-poller-core or reactor-poller-sqs.

Reactor Poller Core Reactor Poller SQS

<dependency>
    <groupId>com.jcarrey</groupId>
    <artifactId>reactor-poller-core</artifactId>
    <version>0.2.0</version>
</dependency>
implementation 'com.jcarrey:reactor-poller-core:0.2.0'
<dependency>
    <groupId>com.jcarrey</groupId>
    <artifactId>reactor-poller-sqs</artifactId>
    <version>0.2.0</version>
</dependency>
implementation 'com.jcarrey:reactor-poller-sqs:0.2.0'

Sample usages

Raw API usage sample

This is raw usage sample, that could be extended to any polling source

var random = new Random();
Poller<Integer> poller = () -> Mono.fromSupplier(() -> random.nextInt(3));
var options = ConcurrencyControlOptions.<Integer>builder()
  .initialConcurrency(1)
  .maxConcurrency(10)
  .minConcurrency(1)
  .strategy(value -> switch (value) {
     case 0 -> ConcurrencyControlOperation.ScaleUp;
     case 1 -> ConcurrencyControlOperation.ScaleDown;
     default -> ConcurrencyControlOperation.Noop;
  })
  .scaleUpFn(ConcurrencyControlFunctions.max())
  .scaleDownFn(ConcurrencyControlFunctions.max())
  .lockMechanism(ConcurrencyLockMechanism.Pessimistic)
  .build();

ReactorPoller.adaptative(poller, options).subscribe();

SQS API usage sample

var sqsClient = SqsAsyncClient.builder().build();
var queueUrl = "....";
var receiveRequest =  ReceiveMessageRequest.builder()
  .queueUrl(queueUrl)
  .maxNumberOfMessages(10)
  .waitTimeSeconds(20)
  .build();

var options = ConcurrencyControlOptions.<ReceiveMessageResponse>builder()
  .initialConcurrency(1d)
  .maxConcurrency(10d)
  .minConcurrency(1d)
  .strategy(SqsStrategies.thresholdScaleUp(8))
  .scaleUpFn(ConcurrencyControlFunctions.max())
  .scaleDownFn(ConcurrencyControlFunctions.max())
  .lockMechanism(ConcurrencyLockMechanism.None)
  .build();

  ReactorPoller.adaptative(new SqsPoller(sqsClient, receiveRequest), options).subscribe();