Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete consumed message from the queue in the SQS Consume task #155

Closed
anna-geller opened this issue May 22, 2023 · 1 comment · Fixed by #156
Closed

Delete consumed message from the queue in the SQS Consume task #155

anna-geller opened this issue May 22, 2023 · 1 comment · Fixed by #156
Labels
bug Something isn't working

Comments

@anna-geller
Copy link
Member

anna-geller commented May 22, 2023

Expected Behavior

There is a bug in the io.kestra.plugin.aws.sqs.Consume task. In SQS, the consumed message must be deleted from the queue after being consumed. Atm, this is not happening, leading to a potential of a never-ending loop, or issues that sometimes a new message doesn't trigger an execution when used with SQS trigger task.

you can reproduce it using:

https://github.com/anna-geller/kestra-terraform-examples/blob/main/aws_sqs_tf/sqs.tf

When two messages are sent, i.e., 2 tasks - each sending an SQS message, I would expect that each of these messages would trigger an execution of the flow that has an SQS trigger:

id: sqsPublishMessage
namespace: prod
tasks:
  - id: generateMessage
    type: io.kestra.core.tasks.debugs.Return
    format: "Hi from a Kestra task {{task.id}}"
  - id: publishMessage
    type: io.kestra.plugin.aws.sqs.Publish
    accessKeyId: "{{envs.aws_access_key_id}}"
    secretKeyId: "{{envs.aws_secret_access_key}}"
    region: eu-central-1
    queueUrl: https://sqs.eu-central-1.amazonaws.com/338306982838/kestra
    from:
      data: |
        {{outputs.generateMessage.value}}
  - id: publishMessage2
    type: io.kestra.plugin.aws.sqs.Publish
    accessKeyId: "{{envs.aws_access_key_id}}"
    secretKeyId: "{{envs.aws_secret_access_key}}"
    region: eu-central-1
    queueUrl: https://sqs.eu-central-1.amazonaws.com/338306982838/kestra
    from:
      data: This is a new message to check again

and reacting to those messages:

id: sqsReactToMessage
namespace: prod
tasks:
- id: consumeMessage
  type: io.kestra.plugin.aws.sqs.Consume
  accessKeyId: "{{envs.aws_access_key_id}}"
  secretKeyId: "{{envs.aws_secret_access_key}}"
  region: eu-central-1
  queueUrl: https://sqs.eu-central-1.amazonaws.com/338306982838/kestra
  maxRecords: 1
- id: print
  type: io.kestra.core.tasks.scripts.Bash
  commands:
  - "echo received message stored in {{outputs.consumeMessage.uri}}"
triggers:
- id: sqs
  type: io.kestra.plugin.aws.sqs.Trigger
  description: "to activate it, set disabled to false"
  disabled: false
  queueUrl: https://sqs.eu-central-1.amazonaws.com/338306982838/kestra
  accessKeyId: "{{envs.aws_access_key_id}}"
  secretKeyId: "{{envs.aws_secret_access_key}}"
  region: eu-central-1
  maxRecords: 1

all messages remain in the Queue - they should be deleted as soon as they have been consumed

image

Actual Behaviour

Apart from that, it would be great if the output would be stored as a variable rather than as a file in internal storage. The message is a small piece of text/metadata so it makes sense to make it available as a variable without the ion file's overhead in internal storage. The UX would benefit from that by making it possible to view the message output and pass it to downstream tasks.

So instead of: {{outputs.consumeMessage.uri}}
Having: {{outputs.consumeMessage.value}} would be beneficial

image

Steps To Reproduce

No response

Environment Information

  • Kestra Version: develop-full docker image

Example flow

No response

@anna-geller anna-geller added the bug Something isn't working label May 22, 2023
@loicmathieu
Copy link
Member

Apart from that, it would be great if the output would be stored as a variable rather than as a file in internal storage. The message is a small piece of text/metadata so it makes sense to make it available as a variable without the ion file's overhead in internal storage. The UX would benefit from that by making it possible to view the message output and pass it to downstream tasks.

We usually not provide this functionality for message broker, consume and trigger task will consume a list of messages and messages in a broker can be of any size so we strongly encourage user to store them inside our internal storage.

You can open an enhancement request for that, we may revisit this but it would need to add task properties to choose between STORE or FETCH (the same concept we have in a lot of database query-based tasks).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants