You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
ipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771644,"inputOffset":779003}
{"level":"warn","ts":"2024-01-19T08:06:56.11305699Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771643,"inputOffset":779007}
{"level":"warn","ts":"2024-01-19T08:06:56.116730058Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771641,"inputOffset":779020}
{"level":"warn","ts":"2024-01-19T08:06:56.122465142Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771642,"inputOffset":779015}
{"level":"warn","ts":"2024-01-19T08:06:56.499376246Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771638,"inputOffset":779093}
{"level":"info","ts":"2024-01-19T08:06:56.513884334Z","logger":"numaflow.ReduceUDF-processor","caller":"reduce/data_forward.go:622","msg":"Close of book","pipeline":"cloudwatch","vertex":"reducer-2xx","partitionID":"1705650840000-1705651440000-slot-0"}
{"level":"error","ts":"2024-01-19T08:06:56.521123607Z","logger":"numaflow.ReduceUDF-processor","caller":"pbq/pbq.go:57","msg":"Failed to write message to pbq, pbq is closed","pipeline":"cloudwatch","vertex":"reducer-2xx","PBQ":"1705650840000-1705651440000-slot-0","ID":"1705650840000-1705651440000-slot-0","header":{"EventTime":"2024-01-19T08:03:00Z","IsLate":true,"Kind":0,"ID":"5270215-6-alb-mapper-0","Keys":["902738387570","app/splunk-ebs-cmn-ppd-depcm/7e9dac9fb0d8bd01"]},"message":{"EventTime":"2024-01-19T08:03:00Z","IsLate":true,"Kind":0,"ID":"5270215-6-alb-mapper-0","Keys":["902738387570","app/splunk-ebs-cmn-ppd-depcm/7e9dac9fb0d8bd01"],"Payload":"xxxxxxxxxxxxxxxxxxxxxxxxxx","ReadOffset":{},"Watermark":"2024-01-19T08:04:00Z","Metadata":{"NumDelivered":1}},"stacktrace":"github.com/numaproj/numaflow/pkg/reduce/pbq.(*PBQ).Write\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/pbq/pbq.go:57\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeToPBQ.func2\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:489\nk8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:222\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:235\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:228\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:423\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeToPBQ\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:488\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeMessagesToWindows\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:452\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).Process\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:338\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).forwardAChunk\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:270\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).Start\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:142\ngithub.com/numaproj/numaflow/pkg/udf.(*ReduceUDFProcessor).Start.func4\n\t/home/runner/work/numaflow/numaflow/pkg/udf/reduce_udf.go:270"}
{"level":"info","ts":"2024-01-19T08:06:56.522956976Z","logger":"numaflow.ReduceUDF-processor","caller":"reduce/data_forward.go:622","msg":"Close of book","pipeline":"cloudwatch","vertex":"reducer-2xx","partitionID":"1705650840000-1705651440000-slot-0"}
panic: close of closed channel
Describe the bug
Numa container crashes due to following error.
ipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771644,"inputOffset":779003}
{"level":"warn","ts":"2024-01-19T08:06:56.11305699Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771643,"inputOffset":779007}
{"level":"warn","ts":"2024-01-19T08:06:56.116730058Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771641,"inputOffset":779020}
{"level":"warn","ts":"2024-01-19T08:06:56.122465142Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771642,"inputOffset":779015}
{"level":"warn","ts":"2024-01-19T08:06:56.499376246Z","logger":"numaflow.ReduceUDF-processor","caller":"timeline/offset_timeline.go:140","msg":"The idle watermark has a larger offset from the head idle watermark","pipeline":"cloudwatch","vertex":"reducer-2xx","idleWatermark":1705651440000,"existingOffset":771638,"inputOffset":779093}
{"level":"info","ts":"2024-01-19T08:06:56.513884334Z","logger":"numaflow.ReduceUDF-processor","caller":"reduce/data_forward.go:622","msg":"Close of book","pipeline":"cloudwatch","vertex":"reducer-2xx","partitionID":"1705650840000-1705651440000-slot-0"}
{"level":"error","ts":"2024-01-19T08:06:56.521123607Z","logger":"numaflow.ReduceUDF-processor","caller":"pbq/pbq.go:57","msg":"Failed to write message to pbq, pbq is closed","pipeline":"cloudwatch","vertex":"reducer-2xx","PBQ":"1705650840000-1705651440000-slot-0","ID":"1705650840000-1705651440000-slot-0","header":{"EventTime":"2024-01-19T08:03:00Z","IsLate":true,"Kind":0,"ID":"5270215-6-alb-mapper-0","Keys":["902738387570","app/splunk-ebs-cmn-ppd-depcm/7e9dac9fb0d8bd01"]},"message":{"EventTime":"2024-01-19T08:03:00Z","IsLate":true,"Kind":0,"ID":"5270215-6-alb-mapper-0","Keys":["902738387570","app/splunk-ebs-cmn-ppd-depcm/7e9dac9fb0d8bd01"],"Payload":"xxxxxxxxxxxxxxxxxxxxxxxxxx","ReadOffset":{},"Watermark":"2024-01-19T08:04:00Z","Metadata":{"NumDelivered":1}},"stacktrace":"github.com/numaproj/numaflow/pkg/reduce/pbq.(*PBQ).Write\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/pbq/pbq.go:57\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeToPBQ.func2\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:489\nk8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:222\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:235\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:228\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.26.3/pkg/util/wait/wait.go:423\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeToPBQ\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:488\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).writeMessagesToWindows\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:452\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).Process\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:338\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).forwardAChunk\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:270\ngithub.com/numaproj/numaflow/pkg/reduce.(*DataForward).Start\n\t/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:142\ngithub.com/numaproj/numaflow/pkg/udf.(*ReduceUDFProcessor).Start.func4\n\t/home/runner/work/numaflow/numaflow/pkg/udf/reduce_udf.go:270"}
{"level":"info","ts":"2024-01-19T08:06:56.522956976Z","logger":"numaflow.ReduceUDF-processor","caller":"reduce/data_forward.go:622","msg":"Close of book","pipeline":"cloudwatch","vertex":"reducer-2xx","partitionID":"1705650840000-1705651440000-slot-0"}
panic: close of closed channel
goroutine 355 [running]:
github.com/numaproj/numaflow/pkg/reduce/pbq.(*PBQ).CloseOfBook(0xc000532210)
/home/runner/work/numaflow/numaflow/pkg/reduce/pbq/pbq.go:83 +0x25
github.com/numaproj/numaflow/pkg/reduce.(*DataForward).ClosePartitions(0xc0005a6000, {0xc000f3fa40?, 0x1, 0xc0000129f8?})
/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:625 +0x255
github.com/numaproj/numaflow/pkg/reduce.(*DataForward).Process(0xc0005a6000, {0x293e9e0, 0xc000992000}, {0xc000924030, 0x1, 0x0?})
/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:366 +0xd3c
github.com/numaproj/numaflow/pkg/reduce.(*DataForward).forwardAChunk(0xc0005a6000, {0x293e9e0, 0xc000992000})
/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:270 +0xdc7
github.com/numaproj/numaflow/pkg/reduce.(*DataForward).Start(0xc0005a6000)
/home/runner/work/numaflow/numaflow/pkg/reduce/data_forward.go:142 +0x30
github.com/numaproj/numaflow/pkg/udf.(*ReduceUDFProcessor).Start.func4()
/home/runner/work/numaflow/numaflow/pkg/udf/reduce_udf.go:270 +0x79
created by github.com/numaproj/numaflow/pkg/udf.(*ReduceUDFProcessor).Start
/home/runner/work/numaflow/numaflow/pkg/udf/reduce_udf.go:268 +0x20a5
Screenshots
If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
Additional context
Add any other context about the problem here.
Message from the maintainers:
Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.
The text was updated successfully, but these errors were encountered: