-
Notifications
You must be signed in to change notification settings - Fork 167
Description
Hi,
I've started a new app using spring-cloud-stream-binder-aws-kinesis. One issue I'm running into if I try to receive JSON objects is that parsing errors result in a runtime exception that causes the consumer thread to die. I'm using spring.cloud.stream.bindings.input.consumer.concurrency=5, and can see five consumer threads logging every second:
2018-03-01 13:58:23.572 DEBUG 5968 --- [-kinesis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : No records for [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER, sequenceNumber='49582162487825732185088604198809382927677166286909997106', timestamp=null, stream='TestStream', shard='shardId-000000000003', reset=false}, state=CONSUME}] on sequenceNumber [null]. Suspend consuming for [1000] milliseconds.
But after sending in a bad message, the consumer fails and never runs again. The other 4 continue to run, but this particular shard ends up with no consumers reading from it. Bad juju.
Example stacktrace:
2018-03-01 13:58:38.713 ERROR 5968 --- [-kinesis-consumer-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unexpected character ('r' (code 114)): was expecting double-quote to start field name
at [Source: (byte[])"{ref: "XYZ123"}"; line: 1, column: 3]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('r' (code 114)): was expecting double-quote to start field name
at [Source: (byte[])"{reloc: "XYZ123"}"; line: 1, column: 3], failedMessage=GenericMessage [payload=byte[17], headers={aws_shard=shardId-000000000003, id=03d0e8c4-e9ee-d1a2-8188-08c13baadacf, contentType=application/json, aws_receivedStream=TestStream, aws_receivedPartitionKey=1, aws_receivedSequenceNumber=49582162487825732185088604198810591853496823934477140018, timestamp=1519876718698}]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:86)
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:181)
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:70)
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:137)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:141)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:438)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:388)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:197)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter.access$3600(KinesisMessageDrivenChannelAdapter.java:86)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.performSend(KinesisMessageDrivenChannelAdapter.java:957)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.processRecords(KinesisMessageDrivenChannelAdapter.java:928)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ShardConsumer.lambda$processTask$1(KinesisMessageDrivenChannelAdapter.java:835)
at org.springframework.integration.aws.inbound.kinesis.KinesisMessageDrivenChannelAdapter$ConsumerInvoker.run(KinesisMessageDrivenChannelAdapter.java:1018)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
I will probably work around this by receiving the message as a string and doing the JSON conversion manually, but it would be nice if the direct translation worked and handled errors without killing the consumer.
This is in spring-integration-aws-2.0.0.M1. I haven't tried other versions.