Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cancel parent context if timeout on sending to results channel #68

Closed
plastikfan opened this issue Sep 1, 2023 · 11 comments
Closed

cancel parent context if timeout on sending to results channel #68

plastikfan opened this issue Sep 1, 2023 · 11 comments
Assignees
Labels
feature New feature or request

Comments

@plastikfan
Copy link
Contributor

plastikfan commented Sep 1, 2023

This is related to extendio issue 303. If the client needs to consume the results of the worker pool, then they will need to ensure that the results channel is consumed. However, if for some reason the client is miscoded, ie provides a results channel but doesn't consume it, then the worker pool will simply deadlock.

The deadlock should be avoided and can be by introducing a timeout on the result channel send by the worker. If this timeout occurs, then a parent cancel function should be invoked. To achieve this, the client will need to specify a time duration value representing the required timeout (with a sensible default) and pass in the parent context along with its associated cancel function obtained from context.WithCancel.

Or perhaps, the client only needs to send a parent context that has timeout built into it (context.WithTimeout). Try this option too to see what's best.

In order to complete this, we need a way to detect the execution result of the pool itself. We can do this by introducing a new channel that the pool writes to after it completes. The client then reads the result from this channel after progressing past the wait group. Each worker can send an addition reason back to the pool to indicate the nature of its finish notification. Ordinarily, the worker will finish as there are no more jobs. In this case it will return a nil error. If there is a timeout on sending to the output channel, then it will cancel it's parent, then end it loop sending back a timeout error. With this in place, we can then write meaningful test cases that can discriminate the reason for the end of the worker pool processing and assert accordingly.

The pool result channel must buffered, so that it wont be blocked attempting to write to it. It must also write this result before invoking Done on the wait group.

@plastikfan plastikfan added the feature New feature or request label Sep 1, 2023
@plastikfan plastikfan self-assigned this Sep 1, 2023
@plastikfan
Copy link
Contributor Author

plastikfan commented Sep 3, 2023

Another very subtle thing about using WithTimeout, is that the timer starts as soon as this function returns. This means that only the worker can start the timer when it just posted the output to the output channel. My initial thought about building the timeout into the parent context is therefore the incorrect choice, because only the worker knows when the output has been sent, the parent has no idea and nor should it and there is a disconnect between when the parent would start the timer and when the worker sends the output. Many things can happen in between these points and it would be silly to try and account for this time delay into the send timeout.

So the crux of this point is that the parent sends in its context, the associated cancel function and an output write time duration value. The worker will receive all of these, create its own child context from the parent using WithTimeout and in the select statement, if the timeout is triggered, it invokes the parent cancel function which will trigger a full cancellation of the whole worker pool.

The worker seeing the timeout, will immediately exit its run loop and send its finished notification to the pool. The pool will at this point not be in its drain phase, so will not read this notification yet. First it will react to the cancellation triggered by the worker. At this point it will close the forwarding jobs channel.

If one worker experiences a timeout on the output send, then all others will experience the same timeout. However, all the other workers will see the cancellation and exit their run loop. If the worker is already executing its job at the time the cancellation comes in, then it will not immediately see this cancellation, so there could be a reaction delay if the job is long running (relatively).

The producer will be running under the same parent context, so will also stop producing jobs.

The fact that you can't store context in a struct means that implementing this fix will have rippling effects, but this can't be avoided.

@plastikfan
Copy link
Contributor Author

the following test ....

		FEntry(nil, &poolTE{ // Intermittent
			// timeout on error
			//
			given:           "finish by cancel",
			should:          "receive and process all",
			now:             2,
			interval:        8000,
			outputsChSize:   OutputsChSize,
			after:           time.Second / 5,
			outputChTimeout: time.Second * 10,
			finish:          finishWithCancel,
			summarise:       summariseWithConsumer,
			assert:          assertCancelled,
		}),

fails intermittently:

 ✘ plastikfan@Janus  ~/dev/github/go/snivilised/lorax λ   feat/cancel-parent-on-send-timeout ±  task t
[1695289322] Async Suite - 1/9 specs - 7 procs SSSSSSSP
------------------------------
• [FAILED] [0.882 seconds]
WorkerPool stream of jobs [It] 🧪 ===> given: 'finish by cancel', should: 'receive and process all'
/home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:359

  Captured StdOut/StdErr Output >>
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '✨ producer' (count: '1') (running: '✨ producer')
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '🧊 worker pool' (count: '2') (running: '🧊 worker pool/✨ producer')
  >>>> ✨ producer.run ...(ctx:*internal.specContext.WithCancel)
  ===> 🧊 WorkerPool.run ...(ctx:*internal.specContext.WithCancel)
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '💠 consumer' (count: '3') (running: '✨ producer/🧊 worker pool/💠 consumer')
                🧭🧭🧭 [[ WaitGroupAssister(🍂 pipeline).Wait ]] - gr-name: '👾 test-main' (count: '3') (running: '✨ producer/🧊 worker pool/💠 consumer')
  <<<< 💠 consumer.run ...(ctx:*internal.specContext.WithCancel)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👹 ogre}'
  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:901c1018-43bd-480c-b03e-4063e43c9d91) [Seq: 1]
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(input:'{👹 ogre}')
                >>> 💤 CancelAfter - Sleeping before requesting cancellation (200ms) ...
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👺 gobby}'
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:126e3622-c361-48d3-85d5-934a1e675b3d) [Seq: 2]
        ---> 🚀 worker.run((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee)(input:'{👺 gobby}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💩 poo}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💩 poo}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:abfae9d3-e153-40d2-ba26-41b01fc971a4) [Seq: 3]
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💀 skeletor}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:24652992-7606-4982-aabf-03c9b77b1d57) [Seq: 4]
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👾 xenomorph}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👾 xenomorph}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👿 nick}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👿 nick}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🦄 pegasus}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🦄 pegasus}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💀 skeletor}'
                >>> CancelAfter - 🛑🛑🛑 cancellation submitted.
                >>> CancelAfter - ➖➖➖ CANCELLED
  <<<< 💠 consumer.run - done received 💔💔💔
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '💠 consumer' (count: '2') (running: '✨ producer/🧊 worker pool')
  <<<< 💠 consumer.run - finished (QUIT). 💠💠💠
  >>>> ✨ producer.item - done received ⛔⛔⛔
  >>>> ✨ producer.item, 🔴 item NOT posted: '{Recipient:💀 skeletor}'
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '✨ producer' (count: '1') (running: '🧊 worker pool')
  >>>> ✨ producer.run - finished (QUIT). ✨✨✨
  ===> 🧊 (#workers: '2') WorkerPool.run - done received ☢️☢️☢️
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 2 (#GRs: 14); 🧊🧊🧊
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(input:'{💩 poo}')
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee)(cancel) - timeout on send 👿👿👿
        <--- 🚀 worker.run((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee) (SENT FINISHED - error:'timeout on send'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker ((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee) 💢💢💢 finished with error: 'timeout on send'
  !!!! 🧊 WorkerPool.drain - worker-result-error(timeout on send) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete with error: 'timeout on send' (workers count: '0'). 📛📛📛
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '🧊 worker pool' (count: '0') (running: '')
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <--- orpheus(alpha) finished Counts >>> (Producer: '16', Consumer: '0'). 🎯🎯🎯
  🎈🎈🎈🎈 remaining count: '0'
  << Captured StdOut/StdErr Output

  Timeline >>
  STEP: 👾 WAIT-GROUP ADD(producer) @ 09/21/23 10:42:03.178
  STEP: 👾 WAIT-GROUP ADD(worker-pool)
   @ 09/21/23 10:42:03.178
  STEP: 👾 WAIT-GROUP ADD(consumer) @ 09/21/23 10:42:03.178
  STEP: 👾 NOW AWAITING TERMINATION @ 09/21/23 10:42:03.179
  [FAILED] in [It] - /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:229 @ 09/21/23 10:42:04.058
  << Timeline

  [FAILED] Expected
      <*errors.errorString | 0xc000036070>:
      timeout on send
      {s: "timeout on send"}
  to be nil
  In [It] at: /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:229 @ 09/21/23 10:42:04.058
------------------------------

Summarizing 1 Failure:
  [FAIL] WorkerPool stream of jobs [It] 🧪 ===> given: 'finish by cancel', should: 'receive and process all'
  /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:229

Ran 1 of 9 Specs in 0.889 seconds
FAIL! -- 0 Passed | 1 Failed | 1 Pending | 7 Skipped


Ginkgo ran 2 suites in 1.541050302s

There were failures detected in the following suites:
  boost ./boost

Test Suite Failed
task: Failed to run task "t": exit status 1

... and this is what we see when the same test suceeds:

 plastikfan@Janus  ~/dev/github/go/snivilised/lorax λ   feat/cancel-parent-on-send-timeout  task t
[1695289302] Async Suite - 1/9 specs - 7 procs SSSSSSSP
------------------------------
• [0.998 seconds]
WorkerPool stream of jobs 🧪 ===> given: 'finish by cancel', should: 'receive and process all'
/home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:359

  Captured StdOut/StdErr Output >>
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '✨ producer' (count: '1') (running: '✨ producer')
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '🧊 worker pool' (count: '2') (running: '✨ producer/🧊 worker pool')
                ➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '💠 consumer' (count: '3') (running: '💠 consumer/✨ producer/🧊 worker pool')
  <<<< 💠 consumer.run ...(ctx:*internal.specContext.WithCancel)
  ===> 🧊 WorkerPool.run ...(ctx:*internal.specContext.WithCancel)
                🧭🧭🧭 [[ WaitGroupAssister(🍂 pipeline).Wait ]] - gr-name: '👾 test-main' (count: '3') (running: '✨ producer/🧊 worker pool/💠 consumer')
                >>> 💤 CancelAfter - Sleeping before requesting cancellation (200ms) ...
  >>>> ✨ producer.run ...(ctx:*internal.specContext.WithCancel)
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:ca1d896d-e3d7-48d4-b814-77126941973e) [Seq: 1]
        ---> 🚀 worker.run((❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750)(input:'{😺 garfield}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👺 gobby}'
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-1:219127f7-34f7-4d1d-9215-9f5ba87fca2f' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:1d8b94aa-1d7a-412e-b55f-6aa2b0d68eae) [Seq: 2]
        ---> 🚀 worker.run((💙)WORKER-ID-1:219127f7-34f7-4d1d-9215-9f5ba87fca2f) ...(ctx:*internal.specContext.WithCancel)
        ---> 🚀 worker.run((💙)WORKER-ID-1:219127f7-34f7-4d1d-9215-9f5ba87fca2f)(input:'{👺 gobby}')
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👽 paul}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👽 paul}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:758a74cb-8bae-4e95-a92c-f0af2de72672) [Seq: 3]
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👾 xenomorph}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👾 xenomorph}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:46befeb7-64b2-46f9-9f08-6c81cbc6fa9b) [Seq: 4]
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👾 xenomorph}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👾 xenomorph}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🦄 pegasus}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🦄 pegasus}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧛‍♀️ dracula}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧛‍♀️ dracula}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💩 poo}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💩 poo}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👿 nick}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👿 nick}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧙 gandalf}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👾 xenomorph}'
                >>> CancelAfter - 🛑🛑🛑 cancellation submitted.
                >>> CancelAfter - ➖➖➖ CANCELLED
  <<<< 💠 consumer.run - done received 💔💔💔
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '💠 consumer' (count: '2') (running: '✨ producer/🧊 worker pool')
  <<<< 💠 consumer.run - finished (QUIT). 💠💠💠
  >>>> ✨ producer.item - done received ⛔⛔⛔
  >>>> ✨ producer.item, 🔴 item NOT posted: '{Recipient:👾 xenomorph}'
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '✨ producer' (count: '1') (running: '🧊 worker pool')
  >>>> ✨ producer.run - finished (QUIT). ✨✨✨
  ===> 🧊 (#workers: '2') WorkerPool.run - done received ☢️☢️☢️
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 2 (#GRs: 14); 🧊🧊🧊
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-0:787906a4-c3ac-401f-9dbb-a44cec36b750) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.run((💙)WORKER-ID-1:219127f7-34f7-4d1d-9215-9f5ba87fca2f)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💙)WORKER-ID-1:219127f7-34f7-4d1d-9215-9f5ba87fca2f) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete OK (workers count: '0'). ☑️☑️☑️
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '🧊 worker pool' (count: '0') (running: '')
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <--- orpheus(alpha) finished Counts >>> (Producer: '16', Consumer: '0'). 🎯🎯🎯
  🎈🎈🎈🎈 remaining count: '0'
  << Captured StdOut/StdErr Output
------------------------------
 SUCCESS! 1.004574502s
[1695289302] I18n Suite - 0/0 specs - 7 procs  SUCCESS! 6.9192ms

Ginkgo ran 2 suites in 1.654060903s
Test Suite Passed
Detected Programmatic Focus - setting exit status to 197

@plastikfan
Copy link
Contributor Author

plastikfan commented Sep 21, 2023

Actually, to be clear about the above, we DO want an error to be returned, but the test case is coded for a none error, which clearly is fixable. The problem here is the intermittency.

To make the unit test more robust, we must make sure that the consumer is not consuming. We can see from the console output that it is consuming, which makes it harder to force the condition we're trying to test. It also looks like the worker is seeing a Done:

  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 2 (#GRs: 14); 🧊🧊🧊
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(input:'{💩 poo}')
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-0:95382ae3-e83c-45b3-b107-1506ac7c61d7) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.invoke((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee)(cancel) - timeout on send 👿👿👿
        <--- 🚀 worker.run((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee) (SENT FINISHED - error:'timeout on send'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker ((💙)WORKER-ID-1:ad2eaee8-054f-4d8e-ac5c-745c8690deee) 💢💢💢 finished with error: 'timeout on send'
  !!!! 🧊 WorkerPool.drain - worker-result-error(timeout on send) finished, remaining: '0' 🟥

... and the worker code:

		select {
		case w.outputsChOut <- result:

		case <-parentContext.Done():
			fmt.Printf("	---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

		case <-outputContext.Done():
			fmt.Printf("	---> 🚀 worker.invoke(%v)(cancel) - timeout on send 👿👿👿\n", w.id)

			// ??? err = i18n.NewOutputChTimeoutError()
			err = errors.New("timeout on send")

			parentCancel()
		}

The 'done received 💥💥💥' is short circuiting the select and does not allow the timeout to occur. But what we can also see, is that there is still a send timeout occurring, but on a different worker. So the timeout is working, but the problem is the way that the worker pool to reporting its final error, because of the race condition between the workers. The pool has to handle the worker cancellations and correctly decide what the proper error result should be.

@plastikfan
Copy link
Contributor Author

But sometimes we see this:

  <<<< 💠 consumer.run - done received 💔💔💔
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '💠 consumer' (count: '1') (running: '🧊 worker pool')
  <<<< 💠 consumer.run - finished (QUIT). 💠💠💠
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.run((💙)WORKER-ID-1:d5594858-424a-4ef7-b06b-9edeed55090b)(input:'{👹 ogre}')
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.run((❤️)WORKER-ID-0:20245a9a-3414-4317-a8a2-13563842bf52)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-0:20245a9a-3414-4317-a8a2-13563842bf52) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke ⏰ output timeout: '10s'
        ---> 🚀 worker.run((💙)WORKER-ID-1:d5594858-424a-4ef7-b06b-9edeed55090b)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💙)WORKER-ID-1:d5594858-424a-4ef7-b06b-9edeed55090b) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete OK (workers count: '0'). ☑️☑️☑️
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '🧊 worker pool' (count: '0') (running: '')
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <--- orpheus(alpha) finished Counts >>> (Producer: '17', Consumer: '1'). 🎯🎯🎯
  🎈🎈🎈🎈 remaining count: '0'

... there is no timeout on send, but also no done received. I think we need to fix 2 problems here, first how the pool handles workers termination notifcations and the consumer shouldnt be consuming.

@plastikfan
Copy link
Contributor Author

plastikfan commented Sep 21, 2023

In the unit tests, we need to make it clear what the interval relates to. At the moment having a variable called interval is non specific. We need a producer and consumer interval, to make it much clearer.

producer:

pipe.produce(parentContext, interval, provider)

consumer

pipe.consume(parentContext)

This needs to be fixed. The consumer is not configured with an interval. But we can see the extent to which the consumer is consuming by looking at the count on the consumer at the end, eg:

 <--- orpheus(alpha) finished Counts >>> (Producer: '17', Consumer: '1'). 🎯🎯🎯

@plastikfan
Copy link
Contributor Author

Need to turn the intervals into durations as they are currently just an int.

@plastikfan
Copy link
Contributor Author

plastikfan commented Sep 21, 2023

after refactoring the test durations, we have this failing test (the same one as indicated before), but now:

		FEntry(nil, &poolTE{ // Intermittent failure
			given:  "results not consumed in a timely manner",
			should: "timeout on error",
			now:    2,
			intervals: durations{
				consumer:        time.Second * 10,
				finishAfter:     time.Second * 2,
				outputChTimeout: time.Second * 1,
			},
			outputsChSize: OutputsChSize,
			finish:        finishWithCancel,
			summarise:     summariseWithConsumer,
			assert:        assertCancelled,
		}),

which results in failure (no error is returned, even though we expect a timeout):

  >>>> ✨ producer.run - finished (QUIT). ✨✨✨
        ---> 🚀 worker.invoke ⏰ output timeout: '1s'
        ---> 🚀 worker.invoke((💙)WORKER-ID-1:869d76ab-4ee5-4244-afe9-51dd3e77d4a0)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((💙)WORKER-ID-1:869d76ab-4ee5-4244-afe9-51dd3e77d4a0)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((💙)WORKER-ID-1:869d76ab-4ee5-4244-afe9-51dd3e77d4a0) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '1' 🟥
        ---> 🚀 worker.invoke ⏰ output timeout: '1s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:bcb359d0-3363-41c1-b758-7d528c358923)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:bcb359d0-3363-41c1-b758-7d528c358923)(input:'{👿 nick}')
        ---> 🚀 worker.invoke ⏰ output timeout: '1s'
        ---> 🚀 worker.invoke((❤️)WORKER-ID-0:bcb359d0-3363-41c1-b758-7d528c358923)(cancel) - done received 💥💥💥
        ---> 🚀 worker.run((❤️)WORKER-ID-0:bcb359d0-3363-41c1-b758-7d528c358923)(finished) - done received 🔶🔶🔶
        <--- 🚀 worker.run((❤️)WORKER-ID-0:bcb359d0-3363-41c1-b758-7d528c358923) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete OK (workers count: '0'). ☑️☑️☑️
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '🧊 worker pool' (count: '1') (running: '💠 consumer')
  <--- WorkerPool.run (QUIT). 🧊🧊🧊
  <<<< 💠 consumer.run - done received 💔💔💔
                🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '💠 consumer' (count: '0') (running: '')
  <<<< 💠 consumer.run - finished (QUIT). 💠💠💠
  <--- orpheus(alpha) finished Counts >>> (Producer: '24', Consumer: '0'). 🎯🎯🎯
  🎈🎈🎈🎈 remaining count: '0'
  << Captured StdOut/StdErr Output

  Timeline >>
  STEP: 👾 WAIT-GROUP ADD(producer) @ 09/21/23 16:04:05.001
  STEP: 👾 WAIT-GROUP ADD(worker-pool)
   @ 09/21/23 16:04:05.001
  STEP: 👾 WAIT-GROUP ADD(consumer) @ 09/21/23 16:04:05.001
  STEP: 👾 NOW AWAITING TERMINATION @ 09/21/23 16:04:05.001
  [FAILED] in [It] - /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:237 @ 09/21/23 16:04:15.004
  << Timeline

  [FAILED] Expected
      <nil>: nil
  not to be nil
  In [It] at: /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:237 @ 09/21/23 16:04:15.004
------------------------------

Summarizing 1 Failure:
  [FAIL] WorkerPool stream of jobs [It] 🧪 ===> given: 'finish by cancel', should: 'receive and process all'
  /home/plastikfan/dev/github/go/snivilised/lorax/boost/worker-pool_test.go:237

Ran 1 of 9 Specs in 10.013 seconds
FAIL! -- 0 Passed | 1 Failed | 1 Pending | 7 Skipped

What this shows us is:

  • there is no timeout error
  • the consumer is not consuming (<--- orpheus(alpha) finished Counts >>> (Producer: '24', Consumer: '0'). 🎯🎯🎯)
  • we have set a consumer interval of 10 seconds (Ran 1 of 9 Specs in 10.013 seconds)
  • output channel timeout of 1 second ( ---> 🚀 worker.invoke ⏰ output timeout: '1s')
  • producer submits a cancellation request after 2 seconds

With these attributes, one would expect that the worker should bail after a second attempting to send the result to the output channel, but it doesn't. The thing that controls the test run length is the consumer interval, so our worker is not timing out

The relevant worker code:

func (w *worker[I, O]) invoke(parentContext context.Context,
	parentCancel context.CancelFunc,
	outputChTimeout time.Duration,
	job Job[I],
) error {
	var err error

	outputContext, cancel := context.WithTimeout(parentContext, outputChTimeout)
	defer cancel()

	result, _ := w.exec(job)

	if w.outputsChOut != nil {
		fmt.Printf("	---> 🚀 worker.invoke ⏰ output timeout: '%v'\n", outputChTimeout)

		select {
		case w.outputsChOut <- result:

		case <-parentContext.Done():
			fmt.Printf("	---> 🚀 worker.invoke(%v)(cancel) - done received 💥💥💥\n", w.id)

		case <-outputContext.Done():
			// THIS SHOULD TIMEOUT, BUT DOES NOT, THIS IS HERE TO ENSURE
			// THAT THE RESULT SEND ON w.outputsChOut HAPPENS IN A TIMELY
			// MANNER.
			fmt.Printf("	---> 🚀 worker.invoke(%v)(cancel) - timeout on send 👿👿👿\n", w.id)

			err = errors.New("timeout on send")

			parentCancel()
		}
	}

	return err
}

plastikfan added a commit that referenced this issue Sep 22, 2023
plastikfan added a commit that referenced this issue Sep 22, 2023
ref(boost): minor refactorings (#68)

feat(boost): return a pool result (#68)

feat(boost): invoke parent cancel on output timeout (#68)

feat(boost): fix up 18n (#68)
plastikfan added a commit that referenced this issue Sep 22, 2023
ref(boost): minor refactorings (#68)

feat(boost): return a pool result (#68)

feat(boost): invoke parent cancel on output timeout (#68)

feat(boost): fix up 18n (#68)
@plastikfan
Copy link
Contributor Author

this looks to have been fixed by issue #91 - ie using the atomic.AddInt32 function to increment and decrement the wait group counter, instead of using the unsafe ++/-- operators.

With fix:

✘ plastikfan@Hydra   λ   master ●  task t
[1698955014] Async Suite - 1/9 specs - 7 procs SSPSSSSS
------------------------------
• [1.540 seconds]
WorkerPool stream of jobs 🧪 ===> given: 'finish by stop', should: 'timeout and abort'
/Users/plastikfan/dev/github/snivilised/lorax/boost/worker-pool_test.go:447

  Captured StdOut/StdErr Output >>
  		➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '✨ producer' (count: '1') (running: '✨ producer')

  		➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '🧊 worker pool' (count: '2') (running: '✨ producer/🧊 worker pool')

  >>>> ✨ producer.run ...(ctx:*internal.specContext.WithCancel)
  		➕➕➕ [[ WaitGroupAssister(🍂 pipeline).Add ]] - gr-name: '💠 consumer' (count: '3') (running: '✨ producer/🧊 worker pool/💠 consumer')

  >>>> ✨ producer.run - default (running: true) ...
  		🧭🧭🧭 [[ WaitGroupAssister(🍂 pipeline).Wait ]] - gr-name: '👾 test-main' (count: '3') (running: '💠 consumer/✨ producer/🧊 worker pool')

  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👾 xenomorph}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👾 xenomorph}'
  >>>> ✨ producer.run - default (running: true) ...
  		>>> 💤 StopAfter - Sleeping before requesting stop (200ms) ...
  ===> 🧊 WorkerPool.run ...(ctx:*internal.specContext.WithCancel)

  ===> 🧊 (#workers: '0') WorkerPool.run - new job received
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👻 caspar}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👻 caspar}'
  	---> 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754) ...(ctx:*internal.specContext.WithCancel)

  >>>> ✨ producer.run - default (running: true) ...
  <<<< 💠 consumer.run ...(ctx:*internal.specContext.WithCancel)
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👽 paul}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👽 paul}'
  ===> 🧊 WorkerPool.spawned new worker: '(❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754' 🎀🎀🎀
  >>>> ✨ producer.run - default (running: true) ...
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:2dd2cdef-8e7b-4d2c-9b12-d04fdd7f00aa) [Seq: 1]
  ===> 🧊 (#workers: '1') WorkerPool.run - new job received
  	---> 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754)(input:'{👺 gobby}')
  ===> 🧊 WorkerPool.spawned new worker: '(💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227' 🎀🎀🎀
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:ee186a27-243a-49f6-bc14-7628a5d08b44) [Seq: 2]
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💩 poo}'
  	---> 🚀 worker.run((💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227) ...(ctx:*internal.specContext.WithCancel)

  	---> 🚀 worker.run((💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227)(input:'{👾 xenomorph}')
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💩 poo}'
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:966167da-9539-4094-b532-eca516d52fc8) [Seq: 3]
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧛 dracula}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧛 dracula}'
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🧛 dracula}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🧛 dracula}'
  >>>> ✨ producer.run - default (running: true) ...
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:ea92a4b1-af8a-40f5-b900-f9496ffb688a) [Seq: 4]
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👺 gobby}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🐉 smaug}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🐉 smaug}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:🤖 rusty}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👿 nick}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👿 nick}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:👹 ogre}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:💀 skeletor}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.run - default (running: true) ...
  >>>> ✨ producer.item, 🟠 waiting to post item: '{Recipient:😺 garfield}'
  >>>> 🧲 producer terminating ...
  	---> 🚀 worker.invoke ⏰ output timeout: '1ms'
  	---> 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754)(input:'{👻 caspar}')
  <<<< 💠 consumer.run - new result arrived(#1): '			---> 🍉🍉🍉 [Seq: 1] Hello: '👺 gobby''
  ===> 🧊 WorkerPool.run - forwarded job 🧿🧿🧿(JOB-ID:99fb7485-4f27-4670-afde-76eb0a10fdcf) [Seq: 5]
  ===> 🧊 (#workers: '2') WorkerPool.run - new job received
  >>>> ✨ producer.item, 🟢 posted item: '{Recipient:😺 garfield}'
  >>>> ✨ producer.run - termination detected (running: false)
  		🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '✨ producer' (count: '2') (running: '💠 consumer/🧊 worker pool')

  >>>> ✨ producer.run - finished (QUIT). ✨✨✨
  		>>> StopAfter - 🍧🍧🍧 stop submitted.
  	---> 🚀 worker.invoke ⏰ output timeout: '1ms'
  	---> 🚀 worker.invoke((💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227)(cancel) - timeout on send 👿👿👿
  	<--- 🚀 worker.run((💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227) (SENT FINISHED - error:'timeout on send'). 🚀🚀🚀
  <<<< 💠 consumer.run - done received 💔💔💔
  		🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '💠 consumer' (count: '1') (running: '🧊 worker pool')

  <<<< 💠 consumer.run - finished (QUIT). 💠💠💠
  ===> 🧊 (#workers: '2') WorkerPool.run - done received ☢️☢️☢️
  !!!! 🧊 WorkerPool.drain - waiting for remaining workers: 2 (#GRs: 13); 🧊🧊🧊
  !!!! 🧊 WorkerPool.drain - worker ((💙)WORKER-ID-1:36dd5aa8-d85f-408a-9f48-89c17c0e4227) 💢💢💢 finished with error: 'timeout on send'
  !!!! 🧊 WorkerPool.drain - worker-result-error(timeout on send) finished, remaining: '1' 🟥
  	---> 🚀 worker.invoke ⏰ output timeout: '1ms'
  	---> 🚀 worker.invoke((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754)(cancel) - done received 💥💥💥
  	---> 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754)(input:'{👽 paul}')
  	---> 🚀 worker.invoke ⏰ output timeout: '1ms'
  	---> 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754)(finished) - done received 🔶🔶🔶
  	<--- 🚀 worker.run((❤️)WORKER-ID-0:61272c25-a5f7-44cd-b3bc-58ab89c54754) (SENT FINISHED - error:'<nil>'). 🚀🚀🚀
  !!!! 🧊 WorkerPool.drain - worker-result-error(<nil>) finished, remaining: '0' 🟥
  ===> 🧊 WorkerPool.run - drain complete with error: 'timeout on send' (workers count: '0'). 📛📛📛
  		🚩🚩🚩 [[ WaitGroupAssister(🍂 pipeline).Done ]] - gr-name: '🧊 worker pool' (count: '0') (running: '')

  <--- WorkerPool.run (QUIT). 🧊🧊🧊

  <--- orpheus(alpha) finished Counts >>> (Producer: '16', Consumer: '1'). 🎯🎯🎯
  🎈🎈🎈🎈 remaining count: '0'
  << Captured StdOut/StdErr Output
------------------------------
 SUCCESS! 1.570343s

@plastikfan
Copy link
Contributor Author

The remining work to do on this is to check the deactivated tests to ensure they all execute properly.

@plastikfan
Copy link
Contributor Author

plastikfan commented May 22, 2024

We may be able to improve the design of this feature. We should not have to pass in the parent cancel function to the child. All we need to do is set a timeout on the parent context using context.WithTimeout.

@plastikfan plastikfan reopened this May 22, 2024
@plastikfan
Copy link
Contributor Author

This should be addressed in a fresh issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant