Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: Add support for non static compose #43664

Closed
rluvaton opened this issue Jul 3, 2022 · 13 comments
Closed

streams: Add support for non static compose #43664

rluvaton opened this issue Jul 3, 2022 · 13 comments
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.

Comments

@rluvaton
Copy link
Member

rluvaton commented Jul 3, 2022

What is the problem this feature will solve?

It will make the code much easier to read/write when the current readable operators are not enough

What is the feature you are proposing to solve the problem?

Adding a non-static compose method so we can chain multiple operators together

it would like like this for example:

fs.createReadStream("./file.pcapng")
  .compose(async function* (source) {
    let block = Buffer.alloc(0);

    for await (const chunk of source) {
      block = Buffer.concat(block, chunk);

      if (block.length < REQUIRED_BLOCK_SIZE_TO_GET_TO_TOTAL_LENGTH) {
        continue;
      }

      const blockExpectedTotalSize = getTotalSizeFromBlock(block);

      if (block.length < blockExpectedTotalSize) {
        continue;
      }

      if (mergedBlock.length >= blockExpectedTotalSize) {
        yield block.slice(0, blockExpectedTotalSize);
        block = block.slice(blockExpectedTotalSize);
      }
    }
  })
  .map((blockBuffer) => parseBufferBasedOnBlockType(blockBuffer))
  .filter((parsedBlock) => parsedblock.dest === "192.168.1.1")
  .toArray();

What alternatives have you considered?

Using static compose

@rluvaton rluvaton added the feature request Issues that request new features to be added to Node.js. label Jul 3, 2022
@rluvaton
Copy link
Member Author

rluvaton commented Jul 3, 2022

Cc @benjamingr

@benjamingr
Copy link
Member

@nodejs/streams

@VoltrexKeyva VoltrexKeyva added the stream Issues and PRs related to the stream subsystem. label Jul 3, 2022
@mcollina
Copy link
Member

mcollina commented Jul 3, 2022

What use cases would this solve vs the static compose?

@rluvaton
Copy link
Member Author

rluvaton commented Jul 3, 2022

no real problem but make the code much more readable and consistent with the chaining API

Let's say you have a transformer at the start and at the middle, it will be really hard to read:

Example using static compose:

compose(
  compose(
    fs.createReadStream("./file.pcapng"),
    async function* (source) {
      for await (const chunk of source) {
        // Something here
        yield chunk
      }
    })
    .map((blockBuffer) => parseBufferBasedOnBlockType(blockBuffer))
    .filter((parsedBlock) => parsedblock.dest === "192.168.1.1"),
  async function* (source) {
    for await (const chunk of source) {
      // Something here too
      yield chunk
    }
  },
)
  .toArray();

And using it as a non-static method:

fs.createReadStream("./file.pcapng")
  .compose(async function* (source) {
    for await (const chunk of source) {
      // Something here
      yield chunk
    }
  })
  .map((blockBuffer) => parseBufferBasedOnBlockType(blockBuffer))
  .filter((parsedBlock) => parsedblock.dest === "192.168.1.1")
  .compose(async function* (source) {
    for await (const chunk of source) {
      // Something here too
      yield chunk
    }
  })
  .toArray();

@mcollina
Copy link
Member

mcollina commented Jul 4, 2022

I think this use case is already covered by .map():

fs.createReadStream("./file.pcapng")
  .map(async function (chunk) {
    // something async here
    return chunk
  })
  .map((blockBuffer) => parseBufferBasedOnBlockType(blockBuffer))
  .filter((parsedBlock) => parsedblock.dest === "192.168.1.1")
  .map(async function (chunk) {
    // something async here
    return chunk
  })
  .toArray();

I'm not sure the syntax difference is worth adding another method on Readable. However, I'm happy to hear what other thinks.

@rluvaton
Copy link
Member Author

rluvaton commented Jul 4, 2022

I think this use case is already covered by .map():

fs.createReadStream("./file.pcapng")
  .map(async function (chunk) {
    // something async here
    return chunk
  })
  .map((blockBuffer) => parseBufferBasedOnBlockType(blockBuffer))
  .filter((parsedBlock) => parsedblock.dest === "192.168.1.1")
  .map(async function (chunk) {
    // something async here
    return chunk
  })
  .toArray();

I'm not sure the syntax difference is worth adding another method on Readable. However, I'm happy to hear what other thinks.

Map only map 1 input to 1 output and there are cases where we want to map 1/many input to many/1 output

@rluvaton
Copy link
Member Author

rluvaton commented Jul 4, 2022

Let's say the first transformer is parsing binary file.

Binary files are usually made out of blocks let's take for example the pcapng file format (Packet capture format that contains a "dump" of data packets captured over a network) as I'm very familiar with it:

Pcapng file made out of blocks, each block has the total size in it as the size can change depending on the block body

                        1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 0 |                          Block Type                           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 4 |                      Block Total Length                       |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 8 /                          Block Body                           /
   /              variable length, padded to 32 bits               /
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                      Block Total Length                       |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Using map we can't split the file to multiple blocks and parse each one individually.

@benjamingr
Copy link
Member

@mcollina I think a good motivating example of something you can easily do with compose but not any of the other operators (or more accurately - I haven't found a way but maybe you know one) is readline.

Consider this pseudocode for a readline implementation

fs. createReadStream('./someFile').map(async function* lines(source) {
  let chunks = [];
  for await (const chunk of source) {
    const newLine = chunk.indexOf('\n')
    if(newLine !== -1) {
      yield Buffer.concat(chunksconcat(chunk.subarray(0, newLine);
      chunks = [chunk.subarray(newLine)];
   } else {
     chunks.push(chunk);
   }
  }
}).forEach(processLine);

The code can be made shorter but my point wasn't brevity but to underline how this fundamentally needs to keep state between iterations.

For .map to be able to do this it'd need a "scope" it can reference between loop iterations easily e.g.:

async function readlines(stream) {
  let chunks = []; // captured by closure
  return fs.createReadStream('./someFile').map(function (chunk) {
    const newLine = chunk.indexOf('\n')
    if(newLine !== -1) {
     yield Buffer.concat(chunksconcat(chunk.subarray(0, newLine);
     chunks = [chunk.subarray(newLine)];
   } else {
     chunks.push(chunk);
  }
  });
}

Now this technically works (unlike for example RxJS) because iterator helpers are on the iterator and not the iterable so the function is guaranteed to only run for one stream instance (which is actually very fortunate because this is a common footgun in .NET land) - but it's also less composable (the .map code is not reusable unlike the compose code) and relies on the function/block + a closure for scope.

@ronag
Copy link
Member

ronag commented Jul 4, 2022

.scan?

@rluvaton
Copy link
Member Author

rluvaton commented Jul 4, 2022

I'm assuming you meant the RxJS scan method, I'm not very familiar with it but isn't it like reduce?

@ronag
Copy link
Member

ronag commented Jul 6, 2022

I'm assuming you meant the RxJS scan method, I'm not very familiar with it but isn't it like reduce?

No, they are different.

The upside of a compose is that one can have resource management inside the function, which is not possible with any of the other operators.

@ronag
Copy link
Member

ronag commented Jul 6, 2022

@rluvaton Would you mind opening a PR for .compose? Not promising we'll merge it, but I think it's worth further consideration.

@benjamingr
Copy link
Member

.scan is like an iterative reduce with state. It's less powerful than .compose (name pending) like the static compose but I do think it can accommodate the use case mentioned here.

That said I'd personally like to see more community interest before I'd feel we should do this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.
Projects
Status: Pending Triage
Development

No branches or pull requests

5 participants