Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script application utilizing 100% CPU. #334

Closed
dkumarkartik opened this issue Sep 30, 2022 · 17 comments
Closed

Script application utilizing 100% CPU. #334

dkumarkartik opened this issue Sep 30, 2022 · 17 comments

Comments

@dkumarkartik
Copy link

Hello Team,
We are using script processor application in our stream definition. We have seen that script application is processing records at very slow rate. We observed that CPU is utilized 100% on that application. We had deployed stream with default CPU (500m) and MEMORY (1Gi) settings. MEMORY utilization was under limit.

However we tried to increase the CPU limit (2000m) for script application and we saw that script application was performing better than previous deployment. This time CPU utilization was around 60% only.

Just wanted to know are there any recommendation for performance improvement. I am attaching screen shots just for reference.

image
image

@onobc
Copy link
Collaborator

onobc commented Sep 30, 2022

Hi @dkumarkartik ,

I am not aware of any script processor specific performance suggestions/tips. A few questions:

  1. Are you doing anything complicated in the script? If so, maybe do the most basic script instead to rule out something int the script itself. If the problem persists we know its not caused by the script.
  2. What language are you using? It would be interesting to try it in another language to see if it has something to do w/ that.
  3. Is this specific to the script processor or do you see the same behavior for any of the other OOTB stream apps?

Thanks

@dkumarkartik
Copy link
Author

Hi @dkumarkartik ,

I am not aware of any script processor specific performance suggestions/tips. A few questions:

  1. Are you doing anything complicated in the script? If so, maybe do the most basic script instead to rule out something int the script itself. If the problem persists we know its not caused by the script.
  2. What language are you using? It would be interesting to try it in another language to see if it has something to do w/ that.
  3. Is this specific to the script processor or do you see the same behavior for any of the other OOTB stream apps?

Thanks

Hello @onobc,
Thanks for you note.
We are using Javascript as script processor language. And am sure we are not doing any complicated computation in script, We just adding a property to the payload based on a condition.

@onobc
Copy link
Collaborator

onobc commented Oct 3, 2022

Hi @dkumarkartik ,

It would be interesting to try it in another language to see if it has something to do w/ that.
Can you give this a try and report findings?

Is this specific to the script processor or do you see the same behavior for any of the other OOTB stream apps?
Are you seeing this on other OOTB stream apps?

Thanks

@corneil
Copy link
Collaborator

corneil commented Oct 4, 2022

Hi @dkumarkartik,

Are you increase both the request and limit on CPU?
What is the size of the payload?
What is the rate at which messages are processed?

@dkumarkartik
Copy link
Author

Hi @dkumarkartik ,

It would be interesting to try it in another language to see if it has something to do w/ that.
Can you give this a try and report findings?

Is this specific to the script processor or do you see the same behavior for any of the other OOTB stream apps?
Are you seeing this on other OOTB stream apps?

Thanks

Hello,
Yeah sure we will try with different language and will post the findings,
other OOTB stream apps are not that much slow as script processor.

@corneil
Copy link
Collaborator

corneil commented Oct 18, 2022

Please provide detail of what the specific script is doing and which OOTB apps your comparing to.
Do you have metrics on throughput?

@dkumarkartik
Copy link
Author

Please provide detail of what the specific script is doing and which OOTB apps your comparing to. Do you have metrics on throughput?

Hello,
We did some test run and we below observations.

1- We created a stream which read data from S3 buckets using S3 source application and after that script application process those records and sends to kafka topic.
2- On S3 bucket we have 1 GB file which has 3 million records.
3- S3 is processing 1 GB file in 6-8 min where as script application is processing those records in around 1hr and 45min with default memory and CPU settings.
4-With increased CPU(increased to 6) script application processing those same record in around 40 min.
5-We observed that script applications throughput is very less as compared to S3 application. Throughput of S3 application was more than 6K and on the other hand throughput of script application was around 600 and with increased CPU(6) script application throughput was 1.6K .
6-Attaching some screenshot of Grafana dashboard and the script as well which we used for testing.

function test(p) { try { p = JSON.parse(p); p.custom_field = 'NA'; } catch(e) {throw e} return p}; test(payload)

grafana_test_script

grafana_throughput

@artembilan
Copy link
Collaborator

Isn't that JSON.parse(p) is a culprit against 1GB of data?

Let's consider to replace your script processor with a plain filter-processor: https://github.com/spring-cloud/stream-applications/tree/main/applications/processor/filter-processor !

So, I'd try something like --filter.function.expression=#jsonPath(payload,'$.custom_field')=='NA'

@Noushath
Copy link

Isn't that JSON.parse(p) is a culprit against 1GB of data?

Let's consider to replace your script processor with a plain filter-processor: https://github.com/spring-cloud/stream-applications/tree/main/applications/processor/filter-processor !

So, I'd try something like --filter.function.expression=#jsonPath(payload,'$.custom_field')=='NA'

@artembilan actually the json is not of size 1GB. Actually the input file size to pipeline is 1 GB file which has 3 million records, which is read by s3 source app and passed onto script application and sample record can be like {"operation”:”createhouse”,”id":"3786216","comingFrom”:{“source”:”TEST"},"properties”:{“disp_label":"TEST 2","type”:”house”},”label":"house"} and script processor is handling just one record like the one mentioned at a time from the input Kafka topic. Request your help to understand why scrip processor takes so long to complete.

@artembilan
Copy link
Collaborator

No, you didn't mention that you split those "3 million record" into individual messages to process them one by one in the subsequent script processor.
The Script engine in Java is slow by itself.
I cannot find resource at the moment, but I also heard that Nashorn (JavaScript engine) is the slowest JavaScript interpreter.
I believe there is really not too much we can for your script.
As I said before: consider to use some other solution, not script.
Or use some other language, like Groovy.
The scripting is used in the dynamic environment, where it can be reloaded at runtime for some business reasons.
This is really where we are agreed on flexibility over performance.

Also it is not clear if you run your whole data flow and the mentioned stream on the same machine or it is really distributed.
To get better performance from this script processor consider to deploy several instances of it.
So, all of them are going to consume their own partition from Kafka topic an process records in parallel.
But indeed it is better to have them in their own containers.

Does it make sense to you?

@Noushath
Copy link

No, you didn't mention that you split those "3 million record" into individual messages to process them one by one in the subsequent script processor. The Script engine in Java is slow by itself. I cannot find resource at the moment, but I also heard that Nashorn (JavaScript engine) is the slowest JavaScript interpreter. I believe there is really not too much we can for your script. As I said before: consider to use some other solution, not script. Or use some other language, like Groovy. The scripting is used in the dynamic environment, where it can be reloaded at runtime for some business reasons. This is really where we are agreed on flexibility over performance.

Also it is not clear if you run your whole data flow and the mentioned stream on the same machine or it is really distributed. To get better performance from this script processor consider to deploy several instances of it. So, all of them are going to consume their own partition from Kafka topic an process records in parallel. But indeed it is better to have them in their own containers.

Does it make sense to you?

@artembilan thanks for the details, yes script process are running in its own containers in k8s but are not currently parallel, all records are in one topic and handled by the same script container. We will keep you posted on once we try other options and see some difference.

@Noushath
Copy link

@onobc / @corneil / @artembilan , request your help to get the below groovy script working for our app with groovy processor we see syntax is correct, but not working from groovy processor
import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try{ println payload.getClass(); // to see which classtype def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field="NA"; return JsonOutput.toJson(obj); }catch(Exception e){ throw e;}

But no luck it's throwing some error as: : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@4a6be945]; nested exception is java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper.......

@artembilan
Copy link
Collaborator

@Noushath ,

I doubt that your Groovy script is really declared as a single line.
Please, past it here in a multi-line manner as you use it on your side.

Also, please, past a stack trace in a multi-line manner as well.
And I also think there is more in this exception.
Show all of it.

Otherwise it is not readable and it is going to be hard to help.

@akbar-deloitte
Copy link

akbar-deloitte commented Nov 23, 2022

@artembilan & @Noushath ,
Exported stream definition with groovy application is as below

{
    "date": 1669183955476,
    "streams": [
        {
            "name": "groovy-script-1",
            "dslText": ":test1 > test-groovy-script: groovy --management.metrics.tags.application.type=${spring.cloud.dataflow.stream.app.type:unknown} --management.metrics.export.prometheus.enabled=true --management.metrics.tags.stream.name=${spring.cloud.dataflow.stream.name:unknown} --management.metrics.tags.application=${spring.cloud.dataflow.stream.name:unknown}-${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown} --spring.cloud.stream.kafka.streams.binder.zknodes=zookeeper:2181 --management.metrics.tags.instance.index=${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --spring.cloud.stream.kafka.streams.binder.brokers=PLAINTEXT://broker:9092 --wavefront.application.service=${spring.cloud.dataflow.stream.app.label:unknown}-${spring.cloud.dataflow.stream.app.type:unknown}-${vcap.application.instance_index:${spring.cloud.stream.instanceIndex:0}} --management.metrics.tags.application.guid=${spring.cloud.application.guid:unknown} --groovy-processor.script=\"import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }\" --management.metrics.export.prometheus.rsocket.port=7001 --management.metrics.tags.application.name=${vcap.application.application_name:${spring.cloud.dataflow.stream.app.label:unknown}} --management.metrics.export.prometheus.rsocket.enabled=true --management.metrics.export.prometheus.rsocket.host=prometheus-rsocket-proxy --spring.cloud.stream.kafka.binder.zknodes=zookeeper:2181 --wavefront.application.name=${spring.cloud.dataflow.stream.name:unknown} --spring.kafka.streams.properties.metrics.recording.level=DEBUG --spring.cloud.stream.kafka.binder.brokers=PLAINTEXT://broker:9092 > :test2",
            "originalDslText": ":test1 > test-groovy-script: groovy --script=\"import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }\" > :test2",
            "description": "",
            "status": "DEPLOYED"
        }
    ]
}

Stream gets deployed successfully and below stack trace we get when a message is sent to source topic (e.g. test1 in this stream)
Stack trace:

2022-11-23 11:44:47.578  INFO [groovy-processor,,] 5159 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : groovy-script-1: partitions assigned: [test1-0]



2022-11-23 11:45:56.294 ERROR [groovy-processor,810fddd6a14c3f72,2828315f1da073db] 5159 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@5f42821a]; nested exception is java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }], failedMessage=GenericMessage [payload=byte[14], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test1, target-protocol=kafka, b3=810fddd6a14c3f72-810fddd6a14c3f72-0, nativeHeaders={}, kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1e43813f, id=b6783866-abe0-8108-a15f-af573a80ee1a, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1669203952113, contentType=application/json, kafka_groupId=groovy-script-1, timestamp=1669203956288}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454)
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428)
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }]
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.parseScriptIfNecessary(GroovyScriptExecutingMessageProcessor.java:194)
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.executeScript(GroovyScriptExecutingMessageProcessor.java:171)
	at org.springframework.integration.scripting.AbstractScriptExecutingMessageProcessor.processMessage(AbstractScriptExecutingMessageProcessor.java:91)
	at org.springframework.cloud.stream.app.processor.groovy.GroovyProcessorConfiguration.lambda$groovyProcessorFunction$0(GroovyProcessorConfiguration.java:56)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:908)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:864)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:719)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:634)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:710)
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:562)
	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:790)
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:622)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
	... 32 more
Caused by: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }]
	at org.springframework.web.context.support.ServletContextResource.getInputStream(ServletContextResource.java:159)
	at org.springframework.core.io.support.EncodedResource.getReader(EncodedResource.java:143)
	at org.springframework.scripting.support.ResourceScriptSource.getScriptAsString(ResourceScriptSource.java:102)
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.parseScriptIfNecessary(GroovyScriptExecutingMessageProcessor.java:185)
	... 45 more

2022-11-23 11:45:56.306 ERROR [groovy-processor,,] 5159 --- [container-0-C-1] o.s.kafka.listener.DefaultErrorHandler   : Backoff none exhausted for test1-0@2

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@5f42821a]; nested exception is java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }], failedMessage=GenericMessage [payload=byte[14], headers={deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test1, target-protocol=kafka, b3=810fddd6a14c3f72-810fddd6a14c3f72-0, nativeHeaders={}, kafka_offset=2, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1e43813f, id=b6783866-abe0-8108-a15f-af573a80ee1a, kafka_receivedPartitionId=0, kafka_receivedTimestamp=1669203952113, contentType=application/json, kafka_groupId=groovy-script-1, timestamp=1669203956288}]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2665) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2625) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2552) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2433) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2311) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1982) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.run(Unknown Source) ~[na:na]
	at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@5f42821a]; nested exception is java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.20.jar!/:5.3.20]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.20.jar!/:5.3.20]
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.20.jar!/:5.3.20]
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.20.jar!/:5.3.20]
	at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:454) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:428) ~[spring-integration-kafka-5.5.12.jar!/:5.5.12]
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.3.jar!/:na]
	at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255) ~[spring-retry-1.3.3.jar!/:na]
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2645) ~[spring-kafka-2.8.6.jar!/:2.8.6]
	... 11 common frames omitted
Caused by: java.io.UncheckedIOException: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }]
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.parseScriptIfNecessary(GroovyScriptExecutingMessageProcessor.java:194) ~[spring-integration-groovy-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.executeScript(GroovyScriptExecutingMessageProcessor.java:171) ~[spring-integration-groovy-5.5.12.jar!/:5.5.12]
	at org.springframework.integration.scripting.AbstractScriptExecutingMessageProcessor.processMessage(AbstractScriptExecutingMessageProcessor.java:91) ~[spring-integration-scripting-5.5.12.jar!/:5.5.12]
	at org.springframework.cloud.stream.app.processor.groovy.GroovyProcessorConfiguration.lambda$groovyProcessorFunction$0(GroovyProcessorConfiguration.java:56) ~[groovy-processor-3.2.1.jar!/:na]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:908) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:864) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:719) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:634) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:710) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:562) ~[spring-cloud-function-context-3.2.5.jar!/:3.2.5]
	at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:84) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:790) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
	at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:622) ~[spring-cloud-stream-3.2.4.jar!/:3.2.4]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56) ~[spring-integration-core-5.5.12.jar!/:5.5.12]
	... 32 common frames omitted
Caused by: java.io.FileNotFoundException: Could not open ServletContext resource [/import groovy.json.JsonSlurper; import groovy.json.JsonOutput; try { println payload.getClass(); def jsonSlurper = new JsonSlurper(); def obj = jsonSlurper.parseText(payload); obj.custom_field='NA'; return JsonOutput.toJson(obj); } catch(Exception e){ throw e }]
	at org.springframework.web.context.support.ServletContextResource.getInputStream(ServletContextResource.java:159) ~[spring-web-5.3.20.jar!/:5.3.20]
	at org.springframework.core.io.support.EncodedResource.getReader(EncodedResource.java:143) ~[spring-core-5.3.20.jar!/:5.3.20]
	at org.springframework.scripting.support.ResourceScriptSource.getScriptAsString(ResourceScriptSource.java:102) ~[spring-context-5.3.20.jar!/:5.3.20]
	at org.springframework.integration.groovy.GroovyScriptExecutingMessageProcessor.parseScriptIfNecessary(GroovyScriptExecutingMessageProcessor.java:185) ~[spring-integration-groovy-5.5.12.jar!/:5.5.12]
	... 45 common frames omitted

@artembilan
Copy link
Collaborator

groovy --script=\"import groovy.json.JsonSlurper;

That's not correct.
The documentation doesn't say that Groovy script processor supports in-line script definition: https://github.com/spring-cloud/stream-applications/tree/main/applications/processor/groovy-processor

groovy-processor.script
Reference to a script used to process messages. (Resource, default: )

Reference to some file, but definitely not an in-line definition.

Please, revise your configuration respectively.

@Noushath
Copy link

Noushath commented Jan 6, 2023

groovy --script=\"import groovy.json.JsonSlurper;

That's not correct. The documentation doesn't say that Groovy script processor supports in-line script definition: https://github.com/spring-cloud/stream-applications/tree/main/applications/processor/groovy-processor

groovy-processor.script
Reference to a script used to process messages. (Resource, default: )

Reference to some file, but definitely not an in-line definition.

Please, revise your configuration respectively.

We have modified our streams to use groovy and looks better performing compared to javascript.
One open question we have is will it be possible to modify message headers from script processor as we modify payload?

@artembilan
Copy link
Collaborator

Happy New Year, @Noushath !

Thank you for confirming that JavaScript engine is indeed a bottleneck in the whole solution.
Glad to see that Groovy migration has fixed your problem.

Re. message headers: yes, it is possible to add/remove/replace headers from script as long as you return the whole new Message object from the script.
Right now I see you do return JsonOutput.toJson(obj) which indeed becomes a reply message payload.
To be in control of headers as well you have to look into using a org.springframework.integration.support.MessageBuilder API to build a reply message not with only a modified payload, but also with desired headers.

I think we can close this issue from now on since we all agreed that JavaScript engine was not a good choice for target solution.

Thank you everyone for support and discussions!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants