Skip to content

Commit

Permalink
docs(SPEC): add error handling to the SPEC
Browse files Browse the repository at this point in the history
Uses a new hypothetical type syntax that also needs to be fully
specified.

Fixes #463
  • Loading branch information
nathanielc committed Dec 20, 2018
1 parent 8baaec7 commit 36392d8
Showing 1 changed file with 177 additions and 12 deletions.
189 changes: 177 additions & 12 deletions docs/SPEC.md
Expand Up @@ -843,6 +843,15 @@ A function produces side effects when it is explicitly declared to have side eff

Flux contains many preassigned values in the universe block.

### Fail

Fail is a function that immediately terminates the Flux program.

Fail takes a single parameter:

* message - string
Message is a string message to report as the reason for failing.

### Time constants

#### Days of the week
Expand Down Expand Up @@ -1086,6 +1095,10 @@ The data model consists of tables, records, columns and streams.

A record is a tuple of named values and is represented using an object type.

A record type is an object that is not required to have any properties and may have any property.

type record = {; any }

### Column

A column has a label and a data type.
Expand All @@ -1101,6 +1114,17 @@ The available data types for a column are:
time a nanosecond precision instant in time
duration a nanosecond precision duration of time

A column's type consists of a description of column and its values:

type columnMeta = {
label :: string,
type :: type,
grouped :: bool,
}
type column = {
meta :: columnMeta,
values :: [...]record,
}

### Table

Expand All @@ -1113,15 +1137,51 @@ These common values are referred to as the group key value, and can be represent

A tables schema consists of its group key, and its column's labels and types.

[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types
A table is represented using the following type:

type table = {
columns :: []column,
}


A schema type is also defined that is useful for inspecing a table's schema ignoring the actual data.

type schema = {
// List of columns on the table
columns :: []columnMeta,
}

### Stream of tables

A stream represents a potentially unbounded set of tables.
A stream is grouped into individual tables using the group key.
Within a stream each table's group key value is unique.
### Channel of tables

[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types
A channel represents a potentially unbounded set of tables.
A channel is grouped into individual tables using the group key.
Within a channel each table's group key value is unique.

type channel = {
tables :: [...]table,
}

### Stream

A stream represents two channels, a data channel and a meta channel.
The data channel contains the data being processed.
The meta channel contains only meta data about the processing of the real data.
Transformations functions each consume a stream and produce a stream.

The meta channel requires that all tables contain at least a `name` column with a string type.

A stream's type is represented as a set of named channels:

type stream = {
// Data is the raw data to be processed.
// Transformations operate on the data channel.
data :: channel,
// Meta is a channel of collected metadata.
// Transformations do not operate on the meta channel,
// but they may add tables to it.
meta :: channel,
}

### Missing values

Expand All @@ -1134,18 +1194,30 @@ The _null_ value can be of any data type.
### Transformations

Transformations define a change to a stream.
Transformations may consume an input stream and always produce a new output stream.
Transformations must consume an input stream and always produce a new output stream.

Most transformations output one table for every table they receive from the input stream.
Transformations that modify the group keys or values will need to regroup the tables in the output stream.
A transformation produces side effects when it is constructed from a function that produces side effects.
Transformations always operate on the data channel within the stream.
Transformations do not operate on the meta channel within the stream, however they may add new tables to it.
The meta channel may contain arbitrary data about the processing of the data channel.
For example an transformation may add a table to the meta channel containing error that were encountered.

Most transformations output one table for every table they receive from the input data channel.
Transformations that modify the group keys or values will need to regroup the tables in the output channels.

Transformations are repsented using function types.

### Built-in transformations
type transformation = (tables :: <-stream) -> stream

### Sources

Sources are functions that produce a stream.

type source = () -> stream

### Built-in sources and transformations

The following functions are preassigned in the universe block.
These functions each define a transformation.
These functions each define a transformation or a source.

#### From

Expand Down Expand Up @@ -2757,6 +2829,56 @@ If you need to convert other columns use the `map` function directly with the `u

[IMPL#242](https://github.com/influxdata/platform/issues/242) Update specification around type conversion functions.

#### Errors

Errors selects error tables from the meta channel and places them on the data channel.
The input data channel is dropped.
The output meta channel is empty.

Example:

// Perform a simple downsample
from(bucket:"telegraf/autogen")
|> range(start: -5m)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "telegraf/5m")
|> errors()
// Send all errors during downsampling to S3
|> s3.to(bucket: "example.com/errors")


#### Meta

Meta places all tabels from the meta channel into the data channel.
The input data channel is dropped.
The output meta channel is empty.

Example:

// The errors function is implemented in terms of the meta function.
errors = (tables=<-) =>
tables
|> meta()
|> filter(fn: (r) => r.name == "errors")


#### Stats

Stats selects stats tables from the meta channel and places them on the data channel.
The input data channel is dropped.
The output meta channel is empty.

Example:

// Ship data to S3
from(bucket:"telegraf/autogen")
|> range(start: -5m)
|> s3.to(bucket: "example.com/telegraf")
|> stats()
// Write S3 stats back to InfluxDB
|> to(bucket: "stats")



### Composite data types

Expand Down Expand Up @@ -2793,6 +2915,49 @@ A query specification defines what data and operations to perform.
The execution model reserves the right to perform those operations as efficiently as possible.
The execution model may rewrite the query in anyway it sees fit while maintaining correctness.

## Handling data errors

Data errors are errors that occur when processing data from streams.
When a data error occurs there are a few methods for handling the errors.

First, there is the `fail` function that terminates the program.
This can be used whenever an error occurs that is not recoverable in any form.

Second, there is the `errors` function to provides direct access to the errors.
Using the `errors` function any behavior Flux can describe is a possible method for handling errors.

There is an option `errorHandler` that defines the default behavior for all data errors.
The option value is a transformation function that consumes errors produced by other transformations.

The default behavior is to fail when an error occurs.

option errorHandler = (tables=<-) =>
tables
// Fail on any error
|> map(fn: (r) => fail(message: r.message))

// Change the default to log errors to the query system bucket.
// Maybe this should just take an explicit bucket instead of having a system bucket?
option errorHandler = influxdb.log

The errorHandler transformation is automatically appended to the end of any transformation chain.
To disable this global behavior set the errorHandler to a noop:

// Disable global error handling
option errorHandler = (tables=<-) => tables

Using the `errors` transformation function specific error handling may be applied to a specific section of a Flux program.

// Perform a simple downsample
from(bucket:"telegraf/autogen")
|> range(start: -5m)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "telegraf/5m")
|> errors()
// Send all errors during downsampling to S3
|> s3.to(bucket: "example.com/errors")


## Request and Response Formats

Included with the specification of the language and execution model, is a specification of how to submit queries and read their responses over HTTP.
Expand Down

0 comments on commit 36392d8

Please sign in to comment.