Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept iterables and async iterables #72

Merged
merged 7 commits into from
Jul 2, 2023

Conversation

m93a
Copy link
Contributor

@m93a m93a commented May 3, 2023

Fixes #63, fixes #71. Task list:

  • Implement the functionality
  • Assure existing tests pass
  • Add new tests
  • Add documentation

NOTE: This PR changes the return type of pool.items() from the narrower type T[] to the wider type T[] | Iterable<T> | AsyncIterable<T>, which might be a ⚠️ breaking change ⚠️ for some TypeScript codebases!

@m93a
Copy link
Contributor Author

m93a commented May 3, 2023

Right now, the following test fails:

test('stops the pool from async error handler', async () => {
  const timeouts = [10, 20, 30, 40, 50]

  const { results } = await PromisePool
    .for(timeouts)
    .withConcurrency(2)
    .handleError(async (_, __, pool) => {
      pool.stop()
    })
    .process(async (timeout) => {
      if (timeout < 30) {
        throw new Error('stop the pool')
      }

      await pause(timeout)
      return timeout
    })

  expect(results).toEqual([30])
})

@marcuspoehls Could you help me understand why [30] is the expected result here? Since the concurrency is set to 2, I would expect timeouts 10 and 20 to start getting processed and then fail. Therefore, the array should be empty, and not [30]. Is this related to #63?

@marcuspoehls
Copy link
Member

@m93a good question. Intentionally I agree with you, the result should be []. Could be related to #63. Let me think over the test and why I asserted [30] instead of [] as the result.

If you want, you can adjust the mentioned test case to match your expected assertion (empty array). Does it succeed then? Maybe you’re right and the test is wrong

@m93a
Copy link
Contributor Author

m93a commented May 3, 2023

Does it succeed then?

Yes, if I assert an empty array, the test does succeed!

Could be related to #63. Let me think over the test and why I asserted [30] instead of [] as the result.

If you're anything like me, maybe you just ran the test, saw the result and thought "oh of course it's [30]" and changed the test? Sadly, I do this all the time 😁 Anyway, If my thinking is correct, the PR adds an asynchronous gap between the end of one task and the scheduling of another, hence making it possible to stop the next task in time. However, I tried to run the code example from #63 and my PR still didn't fix it, so it only fixes this small peculiarity.

Let me know if you figure out why the test asserted [30] as the correct result! In the meantime I'm going to continue with the PR assuming that [] is correct! 😊

@marcuspoehls
Copy link
Member

Yes, please continue with the assumption that you fixed an issue in the test suite by adding the new functionality 😃👍

@m93a
Copy link
Contributor Author

m93a commented May 3, 2023

The implementation I wrote first was a bit unintuitive when it comes to calling next on the iterable – it always got one more item than it needed at the moment. For example, if you had an iterable yielding elements (10, 20, 30, 40) and used it in a pool with concurrency 2, the pool would consume the first three elements (10, 20, 30) and start processing the first two of them (10, 20). Once either finished, the pool would consume the next item from the iterable (40) and start processing (30). There would always a buffer of one item.

I thought this would be a footgun, as developers would surely assume the pool would only iterate once it needs the next item, and not respecting this intuition would lead to logical errors in the code, so I moved the asynchronous waitForProcessingSlot to the end of the cycle. This made the lazy iterating make more sense, but it also had one unforseen side effect – as process now checks for isStopped after the asynchronous gap, not before it, it is more conservative in starting new tasks. This means that it won't start a new task on a stopped pool anymore, which fixes #63! I'll write some more tests to confirm everything works correctly, but so far it's looking good!


This change (obviously) broke another test:

test('useCorrespondingResults defaults results to notRun symbol', async () => {
  const timeouts = [20, undefined, 10, 100]

  const { results } = await PromisePool
    .withConcurrency(1)
    .for(timeouts)
    .handleError((_error, _index, pool) => {
      pool.stop()
    })
    .useCorrespondingResults()
    .process(async (timeout) => {
      if (timeout) {
        await pause(timeout)
        return timeout
      }
      throw new Error('did not work')
    })

  expect(results).toEqual([
    20,
    PromisePool.failed,
    10,
    PromisePool.notRun,
  ])
})

The pool in question has concurrency 1, so it should have stopped immediately after the failure (as per #63). However, the bug previously caused another timeout to be processed, hence the lonely 10 in the test's expected result. After my fix and changing the expected result to the more intuitive

expect(results).toEqual([
  20,
  PromisePool.failed,
  PromisePool.notRun,
  PromisePool.notRun,
])

the test passes.

@m93a m93a marked this pull request as ready for review May 3, 2023 23:59
@m93a
Copy link
Contributor Author

m93a commented May 9, 2023

Hey, just wanted to let you know that the PR is good to go whenever you have some time!

I haven't gotten around to adding the feature to the docs yet, but I realized they're actually in a separate repo. So, I thought it would be best to create another PR there once this one gets accepted, right?

@marcuspoehls
Copy link
Member

@m93a thanks for the heads up 😃 I’ll look through the changes as soon as I can, probably on the weekend

@marcuspoehls marcuspoehls added the enhancement New feature or request label May 16, 2023
Copy link
Member

@marcuspoehls marcuspoehls left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m93a Thank you for your great work. Looks really nice. And you also fixed the issue where one extra item is processed. Really nice 🙌

I left some minor comments with requests for changes. Please have a look and then lets get this pull request merged 🙂

src/promise-pool-executor.ts Show resolved Hide resolved
this.timeout = undefined
this.concurrency = 10
this.shouldResultsCorrespond = false
this.items = items ?? []
this.items = items ?? [] as any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let’s the remove the as any cast here too, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this! It's a leftover from earlier, when I tried to hack around narrowing the type of items, but it didn't work out. I'll remove the cast!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks 👍

if (this.shouldUseCorrespondingResults()) {
this.results()[index] = PromisePool.notRun
}

await this.waitForProcessingSlot()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove this and only keep the waitForProcessingSlot at the end of this loop. This one doesn’t have any effect, does it?

Copy link
Contributor Author

@m93a m93a May 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this does nothing when items is an array, but if it's an iterable it's quite vital. Then, the results array cannot be pre-filled with notRun as we don't know the length (it could well be infinite). Imagine a scenario with an iterable containing 1,2,3,4,5, the pool starts processing items 1,2,3, then 1 and 3 finish and then the user stops the pool. The result would be [1, notRun, 3]. If this code were to be removed, the second item would either be undefined or <empty slot> (not exactly sure how JS works here). I can add a comment explaining why the statement is needed, tho?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pull request changes the behavior of prefilling the results array to only prefill when items are an array. We’re not prefilling if the items come from an iterable. And we’re only prefilling for arrays if we want corresponding results.

I tested: tests are succeeding when removing this line which waits for a processing slot. Keeping the waiting line at the end though is crucial.

@marcuspoehls marcuspoehls merged commit 9a29f0d into supercharge:main Jul 2, 2023
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
2 participants