Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Suggestion on Handling errors in Channels #4

Closed
jayjanssen opened this issue Mar 26, 2018 · 4 comments
Closed

Suggestion on Handling errors in Channels #4

jayjanssen opened this issue Mar 26, 2018 · 4 comments

Comments

@jayjanssen
Copy link

This departs a bit from the go spec, but I'm considering how to integrate error handling from my channel writers so my readers know something broke upstream.

My current workflow (pre-channels) is that my "getter" function returns a promise that either resolves with the entire get data set, or throws an error if it encounters something getting that data. The getter function can internally be doing multiple webservice/database calls, any of which may result in an distinct error. In pseudo-code (sorry, I think coffeescript), the workflow is like this:

something.getAll()
.each (obj) -> # one object of my dataset
.catch (err) ->
  console.log "Error fetching the data!"

Under channels, I can return a channel, and I can get data from the channel and know when it is closed, but not if the getter had an error or not.

something.getChannel()
.then (channel) ->
  channel.forEach (obj) ->
    # each object of my dataset
.catch (err) ->
  console.log "Exception from getChannel, but not from the thread pushing data to the channel

What I'm wondering is if it makes any sense for a channel writer to be able to throw an exception via the channel so the reader can know the datastream is incomplete:

channel.closeWithError( new Error "Something's Wrong")

and on the receiver, I can catch it:

something.getChannel()
.then (channel) ->
  channel.forEach (obj) ->
     # each object of my dataset
  .catch (err) ->
     console.log "Channel closed with error!"
.catch (err) -> 
   console.log "Error from getChannel"

The only other solution to this I can think of is if my getChannel returns both the channel and a separate promise that rejects if there is a processing issue. That seems clunkier however and I then have to process both the channel and the fetching process to ensure I have gotten all my data.

Mostly I'm just suggesting this as a possible feature and I'm interested in your take on it.

@NodeGuy
Copy link
Owner

NodeGuy commented Mar 28, 2018

Great question! It inspired me to make a change.

As of 0.6.3 you can send promises through channels:

const channel = Channel();

(async () => {
  try {
    const devcorn = await channel.shift();
  } catch (exception) {
    console.error(`Sorry, no unicorns today.`);
  }
})();

await channel.push(
  fetch(`https://twitter.com/chjango/status/925787871973400576`)
);

Now you can find out idiomatically whether there's an error with each datum. Does that solve your problem?

@jayjanssen
Copy link
Author

Hmm, I'm not sure. What I really want to have is a single promise on the larger operation populating the channel, not each channel item.

I've solved this myself for now like this: (sorry, I'm not very es7 fluent yet)

search_channel = (query) -> Promise.try () ->
  ch = Channel(100)
  search_op = new Promise (resolve, reject) ->
     handle_response = (err, response) ->
        if err?
           ch.close()
           return reject( err )
       
        for hit in response.hits
            ch.push hit

        if response.more?
             some_client.search_next( 100, handle_response)
        else
             ch.close()
             return resolve()
     some_client.search( query, 100, handle_response)
      
  return {
    channel: ch.readOnly()
    result: search_op
  } 

Then in my main code:

search_channel( "foo=bar")
.then (s) ->
   s.channel.forEach (item) ->
       # process each item
  .then () ->
      s.result.then () ->
         # Fetch completed successfully
      .catch (err) ->
         # Fetch had an error

Essentially the channel always gets closed, and after the forEach I check the result promise to ensure whether or not I got everything.

@jayjanssen
Copy link
Author

Just an update here, I do like the ability to send a rejection via the channel as it allows my consumer to process all items that do get pushed to the channel up until it hits an error. This simplifies my code a lot. Here's a real Elasticsearch fetcher function that returns a channel

exports.search_channel = (query, ch=Channel(100)) ->
  do () ->
    query.scroll = '30s'
    query.size = ch.length
    total = 0
    loop
      try
        response = await if total == 0
          log.debug "initial fetch"
          es_client.search query
        else
          log.debug "fetching more"
          es_client.scroll {
            scrollId: response._scroll_id
            scroll: '30s'
          }

        for hit in response.hits.hits
          await ch.push hit._source
          total += 1
      catch err
        await ch.push Promise.reject err
        break

      break unless response.hits.total > total
    ch.close()

  ch.readOnly()

The consumer for this can work like this:

channel = es.search_channel { index: "foo", query: "*"}
try
   await channel.forEach (record) ->
     # process record
catch err
   # Fetch error

I guess the only thing this lacks is the ability to throw a channel exception that would allow the consumer to stop processing as soon as there is an error, regardless of how many items are in the channel.

@NodeGuy
Copy link
Owner

NodeGuy commented Apr 5, 2018

.forEach should stop and return a rejected promise immediately if there are any errors in the channel so your consumer code above should work as written.

@NodeGuy NodeGuy closed this as completed Jun 5, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants