Skip to content

parallelMerge() does not propagate return() back to the merged iterables #46

@dko-slapdash

Description

@dko-slapdash

Example code:

import delay from "delay";
import { parallelMerge } from "streaming-iterables";

async function* iterable(name: string, dt: number) {
  try {
    for (let i = 0; ; i++) {
      console.log(`${name}: ${i}`);
      yield `${name}: ${i}`;
      await delay(dt);
    }
  } finally {
    console.log(`Exited ${name}`);
  }
}

async function* caller() {
  //yield* iterable("A", 900);
  yield* parallelMerge(iterable("A", 900));
}

async function main() {
  for await (const message of caller()) {
    if (message.includes("4")) {
      break;
    }

    console.log(`Received ${message}`);
  }

  console.log("Finishing");
  await delay(3000);
}

main().catch((e) => console.log(e));

In this example I do a "dummy" merging of 1 iterable for simplicity (but we can merge multiple, the effect persists). The output is:

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Finishing

Notice that finally {} block in iterable() function was never executed. But it should: replace the call to parallelMerge() with yield* iterable("A", 900); to see the correct output (with "Exited A"):

A: 0
Received A: 0
A: 1
Received A: 1
A: 2
Received A: 2
A: 3
Received A: 3
A: 4
Exited A
Finishing

How it works: both for-await and yield* instructions call the source iterator's return() method once the loop is over, and they propagate that signal further up the stack. I think parallelMerge() just doesn't do this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions