Skip to content

Conversation

@prasadlohakpure
Copy link
Contributor

@prasadlohakpure prasadlohakpure commented Dec 8, 2025

Description

Panic fix for sqs while pushing messages on output channel

  • Existing behaviour:
    Pipeline involving sqs may sometimes give intermittent error panic: send on closed channel.

  • Expected behaviour:
    No intermittent panics for pipelines involving sqs task types.

  • Reason:

getMessages as well as threads(processReceipts) share the same context. So a certain conditions arises intermittently where processReceipts are exiting due to context cancellation, hence the wg.wait() condition on them gets satisfied, causing the Run func to return, and eventually closing the output channel for task

Whereas on the other hand previously in getMessages, s.SendData (trying to send on output channel) was experiencing channel closure, causing panic.

It is evident from the below error message that getMessage is still trying to send on task's output channel, even though it is closed. And there can only be one reason of output channel closure : Return of the SQS's Run function.

  • Stacktrace:
panic: send on closed channel
goroutine 598 [running]:
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendRecord.func1()
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:110 +0x1b
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendRecord(0xc000217080, 0xc0231dd440, 0xec?)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:138 +0x318
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendData(0xc000217080, {0x1b36ac0, 0xc00019f110}, {0xc0022d6d20, 0xec, 0xf0}, 0xc0001953b0)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:99 +0x10e
github.com/patterninc/caterpillar/internal/pkg/pipeline/task/sqs.(*sqs).getMessages(0xc000217080, {0x1b369e0?, 0x2669620?}, 0xc0001953b0, 0xc0001956c0)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/sqs/sqs.go:162 +0x58a
created by github.com/patterninc/caterpillar/internal/pkg/pipeline/task/sqs.(*sqs).Run in goroutine 556
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/sqs/sqs.go:101 +0xd4

The above issue can be reproduced in below conditions:

  1. output channel size is less
  2. Follow up tasks are processing at lower throughputs, causing slow draining of output channels.
  • Fix:
    Make sqs polling sync in nature, and wait for completion of processReceipts(func to push message to output).

  • Testing:
    Tested using test/pipelines/sqs_with_context_concurrency.yaml

Types of changes

  • Docs change / refactoring / dependency upgrade
  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist

  • My code follows the code style of this project.
  • My change requires a change to the documentation and I have updated the documentation accordingly.
  • I have added tests to cover my changes.
  • I have checked downstream dependencies (e.g. ExternalTaskSensors) by searching for DAG name elsewhere in the repo

Copilot AI review requested due to automatic review settings December 8, 2025 04:38
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an intermittent panic ("send on closed channel") in the SQS task implementation by addressing a race condition in the message polling logic.

Key changes:

  • Made getMessages synchronous instead of running it as a goroutine
  • Changed return value from always nil to the actual error from getMessages
  • Ensured deterministic execution order: getMessages completes before Run returns

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@prasadlohakpure prasadlohakpure changed the title Panic fix for sqs while pushing messages on output channel Panic fix for sqs task while pushing messages on output channel Dec 8, 2025
@shubham-n-khanna shubham-n-khanna removed the request for review from hladush December 8, 2025 12:11
@shubham-n-khanna shubham-n-khanna requested a review from a team as a code owner December 8, 2025 12:36
@prasadlohakpure prasadlohakpure requested review from divyanshu-tiwari and removed request for divyanshu-tiwari December 8, 2025 12:38
@shubham-n-khanna shubham-n-khanna requested a review from a team December 8, 2025 12:38
Copy link
Contributor

@hladush hladush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the Run func for sqs was finishing first (but not getMessages) due to non deterministic nature of go routines,
How is it possible?
Run function finishes only if all processReceipts function finishes. processReceipts can finish only if receipts channel is closed. Ant the channel is closed only when getMessages finished.

I'm fine with this change, but this description is not correct
Reason: We were spawning getMessages(func to pull sqs messages) in a separate go routine. If the Run func for sqs was finishing first (but not getMessages) due to non deterministic nature of go routines,(please explain how it's possible) it will still try to push messages in output channel, even though output channel has closed after Run completion.

@prasadlohakpure
Copy link
Contributor Author

prasadlohakpure commented Dec 10, 2025

How is it possible?

@hladush
getMessages as well as threads(processReceipts) share the same context. So a certain conditions arises intermittently where processReceipts are exiting due to context cancellation, hence the wg.wait() condition on them gets satisfied, causing the Run func to return, and eventually closing the output channel for task

Whereas on the other hand previously in getMessages, s.SendData (trying to send on output channel) was experiencing channel closure, causing panic.

It is evident from the below error message that getMessage is still trying to send on task's output channel, even though it is closed. And there can only be one reason of output channel closure : Return of the SQS's Run function.
So I would still consider my reasoning to be correct. Let me know your thoughts.

panic: send on closed channel
goroutine 598 [running]:
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendRecord.func1()
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:110 +0x1b
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendRecord(0xc000217080, 0xc0231dd440, 0xec?)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:138 +0x318
github.com/patterninc/caterpillar/internal/pkg/pipeline/task.(*Base).SendData(0xc000217080, {0x1b36ac0, 0xc00019f110}, {0xc0022d6d20, 0xec, 0xf0}, 0xc0001953b0)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/task.go:99 +0x10e
github.com/patterninc/caterpillar/internal/pkg/pipeline/task/sqs.(*sqs).getMessages(0xc000217080, {0x1b369e0?, 0x2669620?}, 0xc0001953b0, 0xc0001956c0)
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/sqs/sqs.go:162 +0x58a
created by github.com/patterninc/caterpillar/internal/pkg/pipeline/task/sqs.(*sqs).Run in goroutine 556
/home/runner/work/caterpillar/caterpillar/internal/pkg/pipeline/task/sqs/sqs.go:101 +0xd4

The above issue can be reproduced in below conditions:

  1. output channel size is less
  2. Follow up tasks are processing at lower throughputs, causing slow draining of output channels.

But thinking of this, I think another condition can still arise where the processReceipts have exited, but getMessage is yet to complete pushing to receipts. In this condition, few messages will still lie in receipts channel, as processReceipts have already exited. Although I haven't yet encountered this condition, just speculating.

Do you think we should remove common context from processReceipts, and modify the func to exit only if we dont receive anything from channel? This way we can clear remaining messages even after messagePull has stopped.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 1 out of 1 changed files in this pull request and generated no new comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@hladush
Copy link
Contributor

hladush commented Dec 10, 2025

@prasadlohakpure few comments.

  • First of all thanks for detailed explanation.
  • Second we should start to use the context as a first parameter in function and don't take it from env variables.
  • Third we should try to stick to pattern who writes to channel, closes it.

In particular this usecase I would just add getMessages to the wg because it's easier for understand but what you did it's correct.

@prasadlohakpure prasadlohakpure merged commit 678a325 into main Dec 11, 2025
12 checks passed
@prasadlohakpure prasadlohakpure deleted the sqs_panic_fix branch December 11, 2025 16:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants