Skip to content

Commit

Permalink
wip newMapLimitInternal
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-van committed Oct 14, 2021
1 parent 185250c commit fc2c358
Showing 1 changed file with 24 additions and 28 deletions.
52 changes: 24 additions & 28 deletions src/mapLimitInternal.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ async function * mapLimitInternal (asyncIterable, iteratee, queue) {

let lastIndexFetched = -1
let fetching = false
let hasValue = false
let lastValue
let exhausted = false

const waitList = []

const addToWaitList = (identifier, fct) => {
const p = (async () => {
try {
Expand All @@ -30,54 +27,53 @@ async function * mapLimitInternal (asyncIterable, iteratee, queue) {
})()
waitList.push([identifier, p])
}
const raceWaitList = async () => {
const [identifier, state, result] = await Promise.race(waitList.map(([k, v]) => v))
if (state === 'rejected') {
throw result
}
const i = waitList.findIndex(([k, v]) => k === identifier)
assert(i !== -1)
waitList.splice(i, 1)
return [identifier, result]
}

let lastIndexReturned = -1
const results = []
let running = 0

addToWaitList('next', async () => it.next())
while (true) {
if (!hasValue && !fetching && !exhausted && running < queue.concurrency) {
addToWaitList('next', async () => it.next())
fetching = true
}
const [identifier, state, result] = await Promise.race(waitList.map(([k, v]) => v))
const i = waitList.findIndex(([k, v]) => k === identifier)
waitList.splice(i, 1)
if (state === 'rejected') {
throw result
}
const [identifier, result] = await raceWaitList()
if (identifier === 'next') {
const { value, done } = result
assert(!hasValue, 'invalid state')
lastValue = value
fetching = false
if (!done) {
hasValue = true
lastIndexFetched += 1
running += 1
addToWaitList(lastIndexFetched, async () => {
return queue.exec(async () => iteratee(value, lastIndexFetched, asyncIterable))
})
} else {
hasValue = false
exhausted = true
}
} else { // result
running -= 1
assert(lastIndexReturned < identifier, 'invalid state')
results[identifier - lastIndexReturned - 1] = { value: result }
while (results.length >= 1 && results[0] !== undefined) {
const result = results.shift()
lastIndexReturned += 1
yield result.value
}
}
while (results.length >= 1 && results[0] !== undefined) {
const result = results.shift()
lastIndexReturned += 1
yield result.value
if (!fetching && !exhausted && running < queue.concurrency) {
addToWaitList('next', async () => it.next())
fetching = true
}
if (exhausted && lastIndexFetched === lastIndexReturned) {
return
}
if (hasValue) {
addToWaitList(lastIndexFetched, async () => {
return queue.exec(async () => iteratee(lastValue, lastIndexFetched, asyncIterable))
})
running += 1
hasValue = false
}
}
}

Expand Down

0 comments on commit fc2c358

Please sign in to comment.