Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(SPEC): add error handling to the SPEC #586

Open
wants to merge 3 commits into
base: master
from

Conversation

Projects
None yet
6 participants
@nathanielc
Copy link
Contributor

commented Dec 20, 2018

Uses a new hypothetical type syntax that also needs to be fully
specified.

Fixes #463
Fixes #438

Done checklist

  • docs/SPEC.md updated
  • Test cases written

@nathanielc nathanielc force-pushed the docs/error-handling-spec branch from 36392d8 to 447037b Dec 20, 2018

@nathanielc

This comment has been minimized.

Copy link
Contributor Author

commented Dec 20, 2018

There is a lot in the PR, to help focus the discussion let me explain a few things.

This PR uses a new sytnax for describing types, this was necessary to be able to talk about Flux types in any meaningful way. Please do not discuss/fret about the syntax being used. I'll explain it shortly here and then I will create a follow up PR that updates the grammar for this syntax where we can debate its merits there.

A quick overview of the type syntax. A type expression reads like any normal Flux expression except that it must always describe a literal value. A few new operators were added:

  • type <ident> = <type> - defines a new named type
  • :: - means that a type follows
  • {<lower>[; <upper>]} - defines an object type with explicit lower bound and optional upper bound. The any keyword means the set of all possible keys.
  • {; any} - from the above, this is a type that has no lower bound and its upper bound is infinite.
  • (<args>) -> <type> - means a function type
  • []<type> - means a list of that type
  • [...]<type> - means a generator that produces <type>.

OK with that out of the way, just go with it. The purpose of this PR is to discuss how error handling works.

The basic concept is that each stream has two channels, a data and meta channel. The data channel is what we currently have, the meta channel is new, and contains any metadata about the transformation. The meta channel can be explicitly accessed to "handle" errors. Transformations do not read the meta channel, but rather add to it anytime they encounter an error.

An errorHandler option is defined that defines the default error handling behavior.

@nathanielc

This comment has been minimized.

Copy link
Contributor Author

commented Dec 20, 2018

If you want to discuss the type syntax see #587 I wrote up an official PR for the SPEC on it. I made some change from how this PR uses it, but I will not change this PR until the other is accepted.

@aanthony1243
Copy link
Member

left a comment

final thoughts:

  • should we have a stats handler?
  • should we be allowed to define an arbitrary stats handler? Can we generalize the idea of a meta-data handler?
  • can custom metadata be expressed, and defined, in flux?

I think we are like 90% of the way there, but this work will require a delicate balance between making error handling easy to use and not making generalized assumptions about how users want to handle errors. Overall, I like the design, and I can see how this will smoothly fit into our existing data/execution model which is good.

[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types
A channel represents a potentially unbounded set of tables.

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

great improvement in terminology! everyone imagines a stream of an endless supply of bits of data. I already feel clearer in my mind thinking of a channel instead of a stream.

[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types
A channel represents a potentially unbounded set of tables.
A channel is grouped into individual tables using the group key.

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

I've found it important in explanations to clarify that there's group-key-columns that is different from a group-key in that the group-key is columns + values and identifies a particular table.

in the context of a channel, there's exactly one set of group-key-columns. Explaining flux concepts can be greatly enhanced by formalizing this.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Yeah, I do agree.

Single group key columns
Multiple group key values.

tables :: [...]table,
}
### Stream

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

can we riff on the channel idea? would band be too strange?

This comment has been minimized.

Copy link
@nathanielc

nathanielc Jan 2, 2019

Author Contributor

Yeah I think we need better names. I just picked the first thing that came to mind. Open to ideas. Are you thinking a stream is a band or a channel is a band?

The meta channel may contain arbitrary data about the processing of the data channel.
For example an transformation may add a table to the meta channel containing error that were encountered.
Most transformations output one table for every table they receive from the input data channel.

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

let's give these names. simple transformations and channel transformations ? grouping transformations ?

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Is it really useful to say this? It will be clear from examples. Or, maybe, this is the moment to provide an example for clarification after this great introduction to channels and streams.

|> range(start: -5m)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "telegraf/5m")
|> errors()

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

I think it will be convenient to have a filter function as an optional parameter to errors(), or else make sure we name this nicely so that we can install a helper function that takes a function parameter.

This comment has been minimized.

Copy link
@wolffcm

wolffcm Dec 27, 2018

Contributor

You could also just apply filter() to the output of errors(), since the errors will be represented just like regular data.

from(bucket:"telegraf/autogen")
|> range(start: -5m)
|> s3.to(bucket: "example.com/telegraf")
|> stats()

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

same as for errors, can we have some sort of filtering here? compared to meta() the point is to filter meta-data conveniently. let's take it a step further.

First, there is the `fail` function that terminates the program.
This can be used whenever an error occurs that is not recoverable in any form.

Second, there is the `errors` function to provides direct access to the errors.

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

the errors and stats functions should be configurable to define what happens to the channels. Under some conditions, we may want to leave the data channel in place.


The default behavior is to fail when an error occurs.

option errorHandler = (tables=<-) =>

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

or, perhaps this solves my problem from above about what errors and stats should do.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Really like this 👍

|> range(start: -5m)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "telegraf/5m")
|> errors()

This comment has been minimized.

Copy link
@aanthony1243

aanthony1243 Dec 21, 2018

Member

once you do this, you commit to losing the data channel. Any way to recover it?

This comment has been minimized.

Copy link
@nathanielc

nathanielc Jan 2, 2019

Author Contributor

No, but there is a way to preserve it before you lose it. You can assign the result to a variable, then in one branch inspect the errors and in another branch inspect the original data.

data = from() |> ...

data 
    |> errors()

data
    |> doSomething() //  this branches errors are not handled by the above branch since they are downstream for that specific error handling.

@wolffcm
Copy link
Contributor

left a comment

I dig it. Errors terminate by default, but if you want other behavior it is intuitive to get it. I guess that execution subsystem implicitly calls the error handler after each source or transform?

A table is represented using the following type:
type table = {
columns :: []column,

This comment has been minimized.

Copy link
@wolffcm

wolffcm Dec 27, 2018

Contributor

It would be good to call out that each column must be the same length.

type channel = {
tables :: [...]table,
}

This comment has been minimized.

Copy link
@wolffcm

wolffcm Dec 27, 2018

Contributor

Does the channel have a notion of the tables being ordered, i.e., FIFO semantics? It's described as a "set" above, which makes this ambiguous.

This comment has been minimized.

Copy link
@nathanielc

nathanielc Jan 17, 2019

Author Contributor

I am not sure, I am not sure if order matters? But the type requires that they come in some kind of order because of the generator.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Bringing this into stream processing, this means that every table should include either _start, or _stop, or _time, or some incremental _id in the group key columns for the computation to proceed. Do you agree?

|> range(start: -5m)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "telegraf/5m")
|> errors()

This comment has been minimized.

Copy link
@wolffcm

wolffcm Dec 27, 2018

Contributor

You could also just apply filter() to the output of errors(), since the errors will be represented just like regular data.

nathanielc added some commits Dec 20, 2018

docs(SPEC): add error handling to the SPEC
Uses a new hypothetical type syntax that also needs to be fully
specified.

Fixes #463
Fixes #438

@nathanielc nathanielc force-pushed the docs/error-handling-spec branch from 447037b to 548e164 Jan 17, 2019

@jlapacik
Copy link
Contributor

left a comment

I like it. However I think you should remove the some of the language in the transformations section as it's a bit confusing.

Transformations that modify the group keys or values will need to regroup the tables in the output stream.
A transformation produces side effects when it is constructed from a function that produces side effects.
Transformations always operate on the data channel within the stream.
Transformations do not operate on the meta channel within the stream, however they may add new tables to it.

This comment has been minimized.

Copy link
@jlapacik

jlapacik Feb 13, 2019

Contributor

These two sentences seem a bit inconsistent. meta and errors consume from the meta channel directly. And all transformations will have the ability to add tables to the meta channel in case of errors. Perhaps describe what the meta channel is used for but remove these two sentences.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

I think that what you want to say is that user-defined behavior in transformations (like high-order functions) cannot arbitrarily manipulate meta-channels. Meta channels are manipulated by the transformations as a side-effect of applying the user-defined behavior, let it be errors produced by UDF application, statistics about performance, or else.

Transformations always operate on the data channel within the stream.
Transformations do not operate on the meta channel within the stream, however they may add new tables to it.
The meta channel may contain arbitrary data about the processing of the data channel.
For example an transformation may add a table to the meta channel containing error that were encountered.

This comment has been minimized.

Copy link
@jlapacik

jlapacik Feb 13, 2019

Contributor

an transformation -> a transformation
containing error -> containing errors


The default behavior is to fail when an error occurs.

option errorHandler = (tables=<-) =>

This comment has been minimized.

Copy link
@jlapacik

jlapacik Feb 13, 2019

Contributor

Putting aside the implementation of fail, this should be:

option errorHandler = (tables=<-) =>
    tables
        |> errors()
        |> map(fn: (r) => fail(message: r.message))
// Transformations do not operate on the meta channel,
// but they may add tables to it.
meta: channel,
}

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

We should further clarify that there is no total order relationship between data and meta channels.

For example an transformation may add a table to the meta channel containing error that were encountered.

Most transformations output one table for every table they receive from the input data channel.
Transformations that modify the group keys or values will need to regroup the tables in the output channels.

Transformations are repsented using function types.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

repsented -> represented

Errors selects error tables from the meta channel and places them on the data channel.
The input data channel is dropped.
The output meta channel is empty.

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Should an error be a specific type:

type error = {
     message string
     reference string
}

?

So that users know what to expect if they want to process errors?

// Write S3 stats back to InfluxDB
|> to(bucket: "stats")

This comment has been minimized.

Copy link
@affo

affo Mar 4, 2019

Contributor

Should also stats be a well defined type?

@affo

This comment has been minimized.

Copy link
Contributor

commented Mar 4, 2019

This looks great! However, I have some perplexities.

Suppose you have some function with side effects. With this model where you don't have an order relationship between meta and data you can never be sure that your computation fails at the first error encountered without starting processing something else. So, the side-effect transformation could produce unintended side-effects after an error is encountered. This could be "fixed" by calling the error handler function upon error.

What is the purpose of making the meta channels pass through each and every function? This leads somehow to weird questions like: what happens to the meta channel when I group?

Maybe, what we need is simpler than this: a global error channel and one meta channel per transformation. If one wants to correlate stats/meta, he can extract every meta channel and perform joins, etc.

If one wants to populate the global error stream, he can write down the errorHandler so that it writes the error to the global error channel. We would need a to function that allows to produce records to a channel.

So, the error handler should be a function that takes an error as input

option errorHandler = (err) => {
   fail(message: err.message) // for example
}

This is still fragile, because in the case of task parallelism, we cannot even unsure that while you get an error, another stage of the data processing pipeline has started. Se it could be that the error is processed after that some upstream transformation has started operating on fresh data.

Example of global error channel:

err = channel() // channel constructor, or provide global err as builtin variable?

option errorHandler = (e) => {
    to(ch: err)
}

// ... transformations

err |> // process errors if you like
@jsternberg

This comment has been minimized.

Copy link
Contributor

commented Mar 7, 2019

This seems fine to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.