Skip to content

Commit

Permalink
Fixed infinite sync generator problem for mapGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-van committed Nov 3, 2021
1 parent 3a54e5b commit 72bb45b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
12 changes: 12 additions & 0 deletions src/filterGenerator.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

import { expect, test } from '@jest/globals'
import filterGenerator from './filterGenerator.mjs'
import { range } from 'itertools'

test('filterGenerator base', async () => {
const res = []
for await (const el of filterGenerator(range(6), async (x) => x % 2 === 0)) {
res.push(el)
}
expect(res).toEqual([0, 2, 4])
})
5 changes: 4 additions & 1 deletion src/mapGenerator.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,10 @@ async function * mapGenerator (iterable, iteratee, concurrencyOrQueue = 1, order
assert(removed, 'waitList does not contain identifier')
}

let scheduledCount = 0
const scheduledList = new Map()
const schedule = (index, value) => {
scheduledCount += 1
const task = {
value,
index,
Expand All @@ -94,6 +96,7 @@ async function * mapGenerator (iterable, iteratee, concurrencyOrQueue = 1, order
const [state, result] = await iteratee(value, index, iterable)
.then((r) => ['resolved', r], (e) => ['rejected', e])

scheduledCount -= 1
insertInResults(index, value, state, result)
if (state === 'rejected') {
shouldStop = true
Expand Down Expand Up @@ -175,7 +178,7 @@ async function * mapGenerator (iterable, iteratee, concurrencyOrQueue = 1, order
hasFetchedValue = false
fetchedValue = null
}
if (!fetching && !exhausted && !shouldStop) {
if (!fetching && !exhausted && !shouldStop && scheduledCount <= queue.concurrency + 1) {
fetch()
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/mapGenerator.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,31 @@ test('mapGenerator unordered fail in fetch cancels sheduled tasks', async () =>
await delay()
expect(callList).toStrictEqual([])
})

test('mapGenerator infinite sync operator', async () => {
let shouldStop = false
const infiniteSyncGenerator = function * () {
let i = 0
while (true) {
if (shouldStop) {
return
}
yield i
i += 1
}
}
sleep(10).then(() => {
shouldStop = true
})
const results = []
for await (const el of mapGenerator(infiniteSyncGenerator(), async (el) => {
await sleep(1)
return el * 2
})) {
results.push(el)
}
expect(results.length).toBeGreaterThanOrEqual(3)
expect(results[0]).toStrictEqual(0)
expect(results[1]).toStrictEqual(2)
expect(results[2]).toStrictEqual(4)
})

0 comments on commit 72bb45b

Please sign in to comment.