Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throwing an exception from within the forEach callback should cancel the subscription #7

Closed
kruncher opened this issue Jan 2, 2016 · 8 comments

Comments

@kruncher
Copy link

kruncher commented Jan 2, 2016

Here is what I was trying to do:

"use strict";

const Observable = require("zen-observable");

Observable.of(1, 2, 3)
    .forEach(n => {
        console.log(n);

        // Since .forEach returns a promise I assumed that I should be
        // able to return it from here so that errors are propagated up.
        return Observable.of(4, 5, 6)
            .forEach(m => {
                console.log(m);
                throw new Error("ERROR!");
            });
    })
    .then(_ => console.log("completed."))
    .catch(err => console.error(err.stack));

Here is the actual result:

1          
2          
3          
completed. 
4          
5          ! ---------
6          !
4          !
5          !   How come both the forEach's survived an exception?
6          !
4          !
5          !
6          ! ---------

Observations:

  • Errors are silently ignored.
  • "completed." is logged before the inner list has completed.
  • 5 and 6 get logged to the output... I had expected the 'throw' statement to abort the forEach().

What is the best way to handle such a use case with this proposal?

@zenparsing
Copy link
Owner

@kruncher Yes, throwing an exception from within the forEach callback should cancel the subscription. It's a bug in the forEach implementation; thanks for pointing it out!

I'll respond to your other questions in a bit.

@kruncher
Copy link
Author

kruncher commented Jan 2, 2016

@zenparsing thanks for the fast response :)

I have been playing with this and so far the only way that I've been able to detect nested completion is by introducing an "Active Branch Count" variable. Although I am sure that there must be a more eloquent way for me to approach this.

Here is what I came up with in my latest experiment:

"use strict";

const Observable = require("zen-observable");

let activeBranchCount = 0;

function completed() {
    console.log("completed.");
}

Observable.of(1, 2, 3)
    .forEach(n => {
        ++activeBranchCount;

        console.log(n);
        Observable.of(4, 5, 6)
            .forEach(m => {
                console.log(n + ": " + m);
                //throw new Error("ERROR!");
            })
            .then(
                _ => {
                    --activeBranchCount;
                    if (activeBranchCount === 0) {
                        completed();
                    }
                },
                err => {
                    console.error(err.stack);
                }
            );
    })
    .catch(err => console.error(err.stack));

Producing the following result:

1
2
3
1: 4
1: 5
1: 6
2: 4
2: 5
2: 6
3: 4
3: 5
3: 6
completed.   In the right place!   :D

@kruncher
Copy link
Author

kruncher commented Feb 8, 2016

In order to simplify this pattern I have implemented an experimental function that can be used like this:

"use strict";

const Observable = require("zen-observable");

processStream(Observable.of(1, 2, 3), n => {
  console.log(n);
  return Observable.of(4, 5, 6)
    .forEach(m => {
      console.log(m);
    })
})
.then(() => console.log("completed."));


function processStream(stream, callback) {
  if (typeof callback !== "function") {
    callback = undefined;
  }

  return new Promise((fulfill, reject) => {
    let hasFinishedStreaming = false;
    let hasFulfilled = false;
    let waitingCounter = 0;

    function checkHasFulfilled() {
      if (hasFinishedStreaming && !hasFulfilled && waitingCounter === 0) {
        hasFulfilled = true;
        fulfill();
      }
    }

    Promise.resolve(stream)
      .then(resolvedStream =>
        resolvedStream.forEach(streamedItem => {
          if (callback !== undefined) {
            ++waitingCounter;
            callback(streamedItem)
              .then(processedItem => {
                --waitingCounter;
                checkHasFulfilled();
              })
              .catch(reject);
          }
        })
      )
      .then(() => {
        hasFinishedStreaming = true;
        checkHasFulfilled();
      })
      .catch(reject);
  });
}

Note: In my use case I want to always pull all entries from the given stream; but if this was not the desired behaviour then you could just bail straightaway:

function processStream(stream, callback) {
  if (typeof callback !== "function") {
    return Promise.resolve();
  }
  ...

I imagine something like this being a function like Observable#forEach... but of course, perhaps there is a far better way that I'm just not grasping ^_^

Observable.of(1, 2, 3).forEach(n => {
  console.log(n);
  return Observable.of(4, 5, 6)
    .forEach(m => {
      console.log(m);
    })
})
.then(() => console.log("completed."));

EDIT: Here is a chained/inner promise version of my processStream function which seems considerably less hacky than my first attempt:

function processStreamInSeries(s, processor) {
  return Promise.resolve(s).then(stream => new Promise((fulfill, reject) => {
    let innerPromise = Promise.resolve();
    Observable.from(stream)
      .forEach(streamedItem => {
          innerPromise = innerPromise.then(() => Promise.resolve(processor(streamedItem)));
      })
      .then(() => innerPromise.then(fulfill))
      .catch(reject);
  }));
}

function processStreamInParallel(s, processor) {
  return Promise.resolve(s).then(stream => {
    let promises = [];
    return Observable.from(stream)
      .forEach(streamedItem => {
        promises.push(Promise.resolve(processor(streamedItem)));
      })
      .then(() => Promise.all(promises));
  });
}

I'm just sharing this in case anyone looking at this conversation in the future finds it of use.

@zenparsing
Copy link
Owner

@kruncher Sorry I dropped this conversation for so long. In the Rx world, I think they would accomplish something like this by creating a "flatMap" combinator. (In RxJS, I think "flatMap" is an alias for "mergeMap".)

If you wanted to roll your own, it might look something like this:

function flatMap(input, fn) {

    return new Observable(sink => {

        let subs = [], completed = false;

        // Subscribe to the outer Observable
        let outer = input.subscribe({

            next(v) {

                // Call the mapping function
                if (fn) {

                    try { 

                        v = fn(v);

                    } catch (x) { 

                        sink.error(x);
                        return;
                    }
                }    

                // Subscribe to the inner Observable
                let sub = Observable.from(v).subscribe({

                    // Send data to the sink
                    next(v) { sink.next(v) },

                    // Send errors to the sink
                    error(err) { sink.error(err) },

                    // On complete, remove the inner subscription from the list
                    complete() {

                        let i = subs.indexOf(sub);

                        if (i >= 0)
                            subs.splice(i, 1);

                        closeIfDone();
                    }
                });

                // Add the inner subscription to a list
                subs.push(sub);
            },

            // Send errors to the sink
            error(err) { 

                sink.error(err);
            },

            // On complete, if all inner subscriptions are done, finish stream
            complete() {

                completed = true;
                closeIfDone();
            }
        });

        function closeIfDone() {

            if (completed && subs.length === 0)
                sink.complete();
        }

        return _=> {

            // Unsubscribe from each inner observable
            for (let sub of subs)
                sub.unsubscribe();

            // Unsubscribe from the outer observable
            outer.unsubscribe();
        };
    });
}

flatMap(Observable.of(1, 2, 3), x => Observable.of(x, x + 1)).forEach(x => console.log(x));

Also, there are different ways that you might want to flatten a sequence of observables over time. For instance, you might not want to subscribe to the next one until you've exhausted the first one. RxJS has a bunch of them, like "combineLatest", "concatAll", and others.

@kruncher
Copy link
Author

kruncher commented Feb 9, 2016

Thanks; I will try and get my head around how this works :)

On reflection (having watched a really informative video on YouTube) I think that the actual thing that I am looking for is async generators where all this complicated stuff happens below the very sugary syntax.

Thanks again for taking the time to write such a detailed reply!

@zenparsing
Copy link
Owner

No problem. If you're interested in async generator functions you might want to check out the async generator function proposal.

@kruncher kruncher changed the title What is the right way to have nested .forEach's? Throwing an exception from within the forEach callback should cancel the subscription Feb 12, 2016
@kruncher
Copy link
Author

I finally understand how flatMap works and it was indeed exactly what I needed :D

I have changed the title of this issue to reflect the issue with errors not being handled correctly inside forEach calls.

Thanks again!

@zenparsing
Copy link
Owner

Cool! The error issue is fixed with v0.1.10. Thanks again!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants