Skip to content

Commit

Permalink
fixed bug
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-van committed Oct 19, 2021
1 parent 08a6d05 commit 8ed9fcd
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 54 deletions.
75 changes: 29 additions & 46 deletions src/asyncGeneratorMap.mjs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@

import assert_ from 'nanoassert'
import assert from 'nanoassert'
import asyncGeneratorWrap from './asyncGeneratorWrap.mjs'
import CancelledError from './CancelledError.mjs'

const assert = (test, str) => {
if (!test) {
console.log(str)
}
assert_(test, str)
}

/**
* @ignore
* @param {*} asyncIterable ignore
Expand All @@ -31,31 +24,33 @@ async function * asyncGeneratorMap (asyncIterable, iteratee, queue, ordered = tr
let fetching = false
let exhausted = false

const waitList = []
const waitList = new Map()
let currentWaitListInternalIdentifier = 0
const addToWaitList = (identifier, fct) => {
const waitListIdentifier = currentWaitListInternalIdentifier
const p = (async () => {
try {
return [identifier, 'resolved', await fct()]
return [waitListIdentifier, identifier, 'resolved', await fct()]
} catch (e) {
return [identifier, 'rejected', e]
return [waitListIdentifier, identifier, 'rejected', e]
}
})()
waitList.push([identifier, p])
waitList.set(waitListIdentifier, p)
currentWaitListInternalIdentifier += 1
}
const raceWaitList = async () => {
while (true) {
const [identifier, state, result] = await Promise.race(waitList.map(([k, v]) => v))
removeFromWaitList(identifier)
assert(waitList.size >= 1, 'Can not race on empty list')
const [waitListIdentifier, identifier, state, result] = await Promise.race(waitList.values())
removeFromWaitList(waitListIdentifier)
if (state === 'rejected' && result instanceof CustomCancelledError) {
continue
}
return [identifier, state, result]
}
}
const removeFromWaitList = (identifier) => {
const i = waitList.findIndex(([k, v]) => k === identifier)
assert(i !== -1, 'Couldn\'t find index in waitList for removal')
waitList.splice(i, 1)
const removeFromWaitList = (waitListIdentifier) => {
waitList.delete(waitListIdentifier)
}

const scheduledList = []
Expand All @@ -66,14 +61,9 @@ async function * asyncGeneratorMap (asyncIterable, iteratee, queue, ordered = tr
const internalSchedule = (value, index) => {
addToWaitList(index, async () => {
const [p, cancel] = queue._execCancellableInternal(async () => {
assert(scheduledList.length >= 1, 'scheduledList can\'t be empty')
const output = scheduledList.shift()
assert(output.index === index, 'Removed invalid index from sheduled list')
/*
const i = scheduledList.findIndex((el) => el.index === index)
assert(i !== -1, 'Couldn\'t find index in scheduledList for removal')
scheduledList.splice(i, 1)
*/
try {
return iteratee(value, index, asyncIterable)
} finally {
Expand Down Expand Up @@ -115,12 +105,9 @@ async function * asyncGeneratorMap (asyncIterable, iteratee, queue, ordered = tr
if (state === 'rejected') {
lastIndexFetched += 1
exhausted = true
if (ordered) {
assert(lastIndexReturned < lastIndexFetched, 'invalid index to insert after fetch throw')
results[lastIndexFetched - lastIndexReturned - 1] = { state, result }
} else {
results.push({ state, result })
}
const index = ordered ? lastIndexFetched : lastIndexReturned + 1
assert(index > lastIndexReturned, 'invalid index to insert after result')
results[index - lastIndexReturned - 1] = { state, index, result }
} else {
const { value, done } = result
if (!done) {
Expand All @@ -133,25 +120,20 @@ async function * asyncGeneratorMap (asyncIterable, iteratee, queue, ordered = tr
}
} else { // result
running -= 1
if (ordered) {
assert(lastIndexReturned < identifier, 'invalid index to insert after result')
results[identifier - lastIndexReturned - 1] = { state, result }
} else {
results.push({ state, result })
}
const index = ordered ? identifier : lastIndexReturned + 1
assert(index > lastIndexReturned, 'invalid index to insert after result')
results[index - lastIndexReturned - 1] = { state, index, result }
}
if (results.length >= 1 && results[0] !== undefined) {
while (results.length >= 1 && results[0] !== undefined) {
const result = results.shift()
lastIndexReturned += 1
if (result.state === 'rejected') {
cancelAllScheduled()
throw result.result
} else {
yield result.result
}
while (results.length >= 1 && results[0] !== undefined) {
const result = results.shift()
assert(result.index === lastIndexReturned + 1, 'Invalid returned index')
if (result.state === 'rejected') {
cancelAllScheduled()
throw result.result
} else {
yield result.result
}
rescheduleAllCancelled()
lastIndexReturned += 1
}
if (!fetching && !exhausted && running < queue.concurrency) {
addToWaitList('next', async () => it.next())
Expand All @@ -160,6 +142,7 @@ async function * asyncGeneratorMap (asyncIterable, iteratee, queue, ordered = tr
if (exhausted && lastIndexFetched === lastIndexReturned) {
return
}
rescheduleAllCancelled()
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/asyncGeneratorMap.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,15 @@ test('asyncGeneratorMap same queue three levels concurrency 5 random delays', as
const gen1 = asyncGeneratorMap(range(100), async (x, i) => {
await sleep(Math.floor(Math.random() * 10))
return x * 2
}, queue)
}, queue, true, 1)
const gen2 = asyncGeneratorMap(gen1, async (x, i) => {
await sleep(Math.floor(Math.random() * 10))
return x * 2
}, queue)
}, queue, true, 2)
const gen3 = asyncGeneratorMap(gen2, async (x, i) => {
await sleep(Math.floor(Math.random() * 10))
return x * 2
}, queue)
}, queue, true, 3)
const p = (async () => {
const res = []
for await (const el of gen3) {
Expand All @@ -242,7 +242,7 @@ test('asyncGeneratorMap same queue three levels concurrency 5 random delays', as
})()
const res = await p
expect(res).toStrictEqual([...range(100)].map((x) => x * 8))
}, 10000)
})

test('asyncGeneratorMap same queue three levels infinity random delays', async () => {
const queue = new Queue(Number.POSITIVE_INFINITY)
Expand Down Expand Up @@ -295,7 +295,7 @@ test('asyncGeneratorMap same queue three levels busy queue random delays ', asyn
})()
const res = await p
expect(res).toStrictEqual([...range(100)].map((x) => x * 8))
}, 10000)
})

test('findIndexLimit cancelSubsequent busy queue', async () => {
const findIndexLimit = async (iterable, iteratee, queue) => {
Expand Down Expand Up @@ -352,9 +352,7 @@ test('findIndexLimit cancelSubsequent busy queue', async () => {
queue.cancelAllPending()
})

/**
* @ignore
*/
// eslint-disable-next-line require-jsdoc
class TestError extends Error {}

test('asyncGeneratorMap fail fetch first', async () => {
Expand Down

0 comments on commit 8ed9fcd

Please sign in to comment.