Skip to content

Commit

Permalink
GH-112: Add KinesisShardEndedEvent to KclMDChA
Browse files Browse the repository at this point in the history
Fixes #112
  • Loading branch information
artembilan committed Mar 31, 2023
1 parent 75cd725 commit 10b94e3
Showing 1 changed file with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RetrievalConfig;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.AttributeAccessor;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.log.LogMessage;
import org.springframework.core.serializer.support.DeserializingConverter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.aws.event.KinesisShardEndedEvent;
import org.springframework.integration.aws.support.AwsHeaders;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mapping.InboundMessageMapper;
Expand All @@ -86,7 +89,8 @@
*/
@ManagedResource
@IntegrationManagedResource
public class KclMessageDrivenChannelAdapter extends MessageProducerSupport {
public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
implements ApplicationEventPublisherAware {

private static final ThreadLocal<AttributeAccessor> attributesHolder = new ThreadLocal<>();

Expand Down Expand Up @@ -127,6 +131,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport {

private boolean bindSourceRecord;

private ApplicationEventPublisher applicationEventPublisher;

private volatile Scheduler scheduler;

public KclMessageDrivenChannelAdapter(String... streams) {
Expand All @@ -153,6 +159,11 @@ public KclMessageDrivenChannelAdapter(KinesisAsyncClient kinesisClient, CloudWat
this.dynamoDBClient = dynamoDBClient;
}

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

public void setExecutor(TaskExecutor executor) {
Assert.notNull(executor, "'executor' must not be null.");
this.executor = executor;
Expand Down Expand Up @@ -407,6 +418,11 @@ public void shardEnded(ShardEndedInput shardEndedInput) {
catch (ShutdownException | InvalidStateException ex) {
logger.error(ex, "Exception while checkpointing at requested shutdown. Giving up");
}

if (KclMessageDrivenChannelAdapter.this.applicationEventPublisher != null) {
KclMessageDrivenChannelAdapter.this.applicationEventPublisher.publishEvent(
new KinesisShardEndedEvent(KclMessageDrivenChannelAdapter.this, this.shardId));
}
}

@Override
Expand Down

0 comments on commit 10b94e3

Please sign in to comment.