Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StringStream.whenEnd produces UnhandledPromiseRejectionWarning on error #103

Closed
mathieubergeron opened this issue Mar 2, 2021 · 4 comments
Assignees
Labels

Comments

@mathieubergeron
Copy link

Basics

Hi!
I just discovered Scramjet, it looks awesome! I've previously tried highland but ran into some issues regarding how Readable/Writable streams are handled (or had to be handled) on errors - streams were not always closed properly on errors. Moreover, highland seems to be no longer supported. I think Scramjet is a very nice alternative.

So, I'm performing small experiments to understand how Scramjet handles nodejs Readable/Writable streams. More specifically, I want to verify if streams are always properly closed, even on error.

But first, here is a simple test case just to make sure everything works fine:

import { StringStream } from 'scramjet';
import { createReadStream, createWriteStream } from 'fs';

const inputStream = createReadStream('/tmp/in.txt', { encoding: 'utf-8' });
const outputStream = createWriteStream('/tmp/out.txt', { encoding: 'utf-8' });

inputStream.on('close', () => console.log('inputStream close'));
outputStream.on('close', () => console.log('outputStream close'));

StringStream.from(inputStream)
  .setOptions({ maxParallel: 1 })
  .lines()
  .map((line) => {
    console.log(`map -> ${line}`);
    return `${line} - ok\n`;
  })
  .tee(outputStream)
  .whenEnd()
  .then(() => {
    console.log('Done!');
  })
  .catch((err) => {
    console.log(`Error: ${err}`);
  });

Note: I'm not sure if using tee (or copy) instead of pipe is the proper way to write the result into a file. But ultimatelly, I want to return a Promise that will resolve on end, or reject as soon as an error occurs. On error, the stream should also stop processing new chunk. pipe does not allow me to do that.

The standard output of that code is:

map -> 1
map -> 2
map -> 3
map -> 4
map -> 5
map -> 6
map -> 7
map -> 8
map -> 9
map -> 10
map -> 11
map -> 12
map -> 13
map -> 14
map -> 15
map -> 16
map -> 
Done!
inputStream close
outputStream close

Obervations/Remarks:

  • input and output streams are properly closed as expected. It has probably nothing to do with Scramjet itself since, to my knowledge, nodejs Writable streams automatically closes when the Readable ends (right?).
  • This is a minor inconveniance, but Scramjet seem to insert an empty line at the end (in.txt does not actually include an empty line at the end)

Other then that, all seems great.

Describe the bug

Then I tried to simulate an error:

import { StringStream } from 'scramjet';
import { createReadStream, createWriteStream } from 'fs';

const inputStream = createReadStream('/tmp/in.txt', { encoding: 'utf-8' });
const outputStream = createWriteStream('/tmp/out.txt', { encoding: 'utf-8' });

inputStream.on('close', () => console.log('inputStream close'));
outputStream.on('close', () => console.log('outputStream close'));

let lineCounter = 0;

StringStream.from(inputStream)
  .setOptions({ maxParallel: 1 })
  .lines()
  .map((line) => {
    console.log(`map -> ${line}`);

    // Simulate error at line 5.
    lineCounter++;
    if (lineCounter === 5) {
      throw new Error('Simulated error');
    }

    return `${line} - ok\n`;
  })
  .tee(outputStream)
  .whenEnd()
  .then(() => {
    console.log('Done!');
  })
  .catch((err) => {
    console.log(`Catched: ${err}`);
  });

And faced two (potentially related) issues:

  • UnhandledPromiseRejectionWarning
  • Output stream is not closed

Here is the output:

map -> 1
map -> 2
map -> 3
map -> 4
map -> 5
Catched: Error: Simulated error
(node:19178) UnhandledPromiseRejectionWarning: Error: The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received type boolean (true)
    at /home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/mk-transform.js:73:51
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
  caused by:
    at WriteStream.Writable.write (internal/streams/writable.js:285:13)
    at StringStream.ondata (internal/streams/readable.js:719:22)
    at StringStream.emit (events.js:315:20)
    at StringStream.EventEmitter.emit (domain.js:467:12)
    at addChunk (internal/streams/readable.js:309:12)
    at readableAddChunk (internal/streams/readable.js:284:9)
    at StringStream.Readable.push (internal/streams/readable.js:223:10)
    at StringStream.Transform.push (internal/streams/transform.js:166:32)
    at /home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/mk-transform.js:71:38
    at processTicksAndRejections (internal/process/task_queues.js:93:5)
  --- raised in StringStream(2) constructed ---
    at new PromiseTransformStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/util/promise-transform-stream.js:65:27)
    at new DataStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/data-stream.js:43:9)
    at new StringStream (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/string-stream.js:34:9)
    at StringStream.map (/home/mbergero/dev/speech-practice/dyno/node_modules/scramjet-core/lib/data-stream.js:197:26)
    at Object.<anonymous> (/home/mbergero/dev/speech-practice/dyno/scramjet-error-handling.ts:15:4)
    at Module._compile (internal/modules/cjs/loader.js:1063:30)
    at Module.m._compile (/opt/nodejs/node-v14.15.4-linux-x64/lib/node_modules/ts-node/src/index.ts:1056:23)
    at Module._extensions..js (internal/modules/cjs/loader.js:1092:10)
    at Object.require.extensions.<computed> [as .ts] (/opt/nodejs/node-v14.15.4-linux-x64/lib/node_modules/ts-node/src/index.ts:1059:12)
    at Module.load (internal/modules/cjs/loader.js:928:32)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:19178) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 5)
(node:19178) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.

inputStream close

To Reproduce

Would you prefer me to create a public repo with that code? That would not be a problem.

Expected behavior

I guess my first question actually is: am I wrong in thinking that closing the output stream on error is the responsability of scramjet? If I am wrong, how should I handle that properly?

Test case

If possible, please provide an automated test case to include, better yet in a forked scramjet repo in test/cases.

Let me know if you'd like me to do that.

System

  • OS: Ubuntu 16.04
  • Node version: v14.15.4
  • ts-node version: v9.1.1
  • Scramjet Version: 4.35.12

Thanks a lot! Let me know if I can do anything to help.

@MichalCz
Copy link
Sponsor Member

MichalCz commented Mar 2, 2021

Hi Mathieu,

Thanks for the extensive report. I will be looking into this over the weekend.

You're probably quite right this happens due to an error in tee, but at the same time I'm not sure if this is the best way to achieve what you're trying here...

I'll propose a temporary solution and go after the tee'd error.

Best,
M.

@MichalCz
Copy link
Sponsor Member

Hi, I managed to identify the issue - the raise method seems to return true which does stop the stream, but in a strange way: by pushing a true chunk which in fact does fail. I'm looking into solutions.

@MichalCz
Copy link
Sponsor Member

Ok, so I did find a problem and a solution was found, however it breaks almost all current tests and cannot be supported in the current interface.

Sadly no elegant workaround is available, but this works:

https://github.com/scramjetorg/scramjet-issue-tests/blob/main/103-whenend/index-workaround.js

In April/May we will be starting work on a new API for Scramjet v5 - v4 interface will be supported as is through a compatibility layer.

@mathieubergeron
Copy link
Author

Thank you for your investigation! I will be looking forward for v5 then :)

MichalCz added a commit to scramjetorg/scramjet-core that referenced this issue Aug 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants