diff --git a/docs/SPEC.md b/docs/SPEC.md index f9e4a88960..cb0d19915b 100644 --- a/docs/SPEC.md +++ b/docs/SPEC.md @@ -843,6 +843,15 @@ A function produces side effects when it is explicitly declared to have side eff Flux contains many preassigned values in the universe block. +### Fail + +Fail is a function that immediately terminates the Flux program. + +Fail takes a single parameter: + +* message - string + Message is a string message to report as the reason for failing. + ### Time constants #### Days of the week @@ -1086,6 +1095,10 @@ The data model consists of tables, records, columns and streams. A record is a tuple of named values and is represented using an object type. +A record type is an object that is not required to have any properties and may have any property. + + type record = {; any } + ### Column A column has a label and a data type. @@ -1101,6 +1114,17 @@ The available data types for a column are: time a nanosecond precision instant in time duration a nanosecond precision duration of time +A column's type consists of a description of column and its values: + + type columnMeta = { + label :: string, + type :: type, + grouped :: bool, + } + type column = { + meta :: columnMeta, + values :: [...]record, + } ### Table @@ -1113,15 +1137,51 @@ These common values are referred to as the group key value, and can be represent A tables schema consists of its group key, and its column's labels and types. -[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types +A table is represented using the following type: + + type table = { + columns :: []column, + } + + +A schema type is also defined that is useful for inspecing a table's schema ignoring the actual data. + + type schema = { + // List of columns on the table + columns :: []columnMeta, + } -### Stream of tables -A stream represents a potentially unbounded set of tables. -A stream is grouped into individual tables using the group key. -Within a stream each table's group key value is unique. +### Channel of tables -[IMPL#463](https://github.com/influxdata/flux/issues/463) Specify the primitive types that make up stream and table types +A channel represents a potentially unbounded set of tables. +A channel is grouped into individual tables using the group key. +Within a channel each table's group key value is unique. + + type channel = { + tables :: [...]table, + } + +### Stream + +A stream represents two channels, a data channel and a meta channel. +The data channel contains the data being processed. +The meta channel contains only meta data about the processing of the real data. +Transformations functions each consume a stream and produce a stream. + +The meta channel requires that all tables contain at least a `name` column with a string type. + +A stream's type is represented as a set of named channels: + + type stream = { + // Data is the raw data to be processed. + // Transformations operate on the data channel. + data :: channel, + // Meta is a channel of collected metadata. + // Transformations do not operate on the meta channel, + // but they may add tables to it. + meta :: channel, + } ### Missing values @@ -1134,18 +1194,30 @@ The _null_ value can be of any data type. ### Transformations Transformations define a change to a stream. -Transformations may consume an input stream and always produce a new output stream. +Transformations must consume an input stream and always produce a new output stream. -Most transformations output one table for every table they receive from the input stream. -Transformations that modify the group keys or values will need to regroup the tables in the output stream. -A transformation produces side effects when it is constructed from a function that produces side effects. +Transformations always operate on the data channel within the stream. +Transformations do not operate on the meta channel within the stream, however they may add new tables to it. +The meta channel may contain arbitrary data about the processing of the data channel. +For example an transformation may add a table to the meta channel containing error that were encountered. + +Most transformations output one table for every table they receive from the input data channel. +Transformations that modify the group keys or values will need to regroup the tables in the output channels. Transformations are repsented using function types. -### Built-in transformations + type transformation = (tables :: <-stream) -> stream + +### Sources + +Sources are functions that produce a stream. + + type source = () -> stream + +### Built-in sources and transformations The following functions are preassigned in the universe block. -These functions each define a transformation. +These functions each define a transformation or a source. #### From @@ -2757,6 +2829,56 @@ If you need to convert other columns use the `map` function directly with the `u [IMPL#242](https://github.com/influxdata/platform/issues/242) Update specification around type conversion functions. +#### Errors + +Errors selects error tables from the meta channel and places them on the data channel. +The input data channel is dropped. +The output meta channel is empty. + +Example: + + // Perform a simple downsample + from(bucket:"telegraf/autogen") + |> range(start: -5m) + |> aggregateWindow(every: 1m, fn: mean) + |> to(bucket: "telegraf/5m") + |> errors() + // Send all errors during downsampling to S3 + |> s3.to(bucket: "example.com/errors") + + +#### Meta + +Meta places all tabels from the meta channel into the data channel. +The input data channel is dropped. +The output meta channel is empty. + +Example: + + // The errors function is implemented in terms of the meta function. + errors = (tables=<-) => + tables + |> meta() + |> filter(fn: (r) => r.name == "errors") + + +#### Stats + +Stats selects stats tables from the meta channel and places them on the data channel. +The input data channel is dropped. +The output meta channel is empty. + +Example: + + // Ship data to S3 + from(bucket:"telegraf/autogen") + |> range(start: -5m) + |> s3.to(bucket: "example.com/telegraf") + |> stats() + // Write S3 stats back to InfluxDB + |> to(bucket: "stats") + + ### Composite data types @@ -2793,6 +2915,49 @@ A query specification defines what data and operations to perform. The execution model reserves the right to perform those operations as efficiently as possible. The execution model may rewrite the query in anyway it sees fit while maintaining correctness. +## Handling data errors + +Data errors are errors that occur when processing data from streams. +When a data error occurs there are a few methods for handling the errors. + +First, there is the `fail` function that terminates the program. +This can be used whenever an error occurs that is not recoverable in any form. + +Second, there is the `errors` function to provides direct access to the errors. +Using the `errors` function any behavior Flux can describe is a possible method for handling errors. + +There is an option `errorHandler` that defines the default behavior for all data errors. +The option value is a transformation function that consumes errors produced by other transformations. + +The default behavior is to fail when an error occurs. + + option errorHandler = (tables=<-) => + tables + // Fail on any error + |> map(fn: (r) => fail(message: r.message)) + + // Change the default to log errors to the query system bucket. + // Maybe this should just take an explicit bucket instead of having a system bucket? + option errorHandler = influxdb.log + +The errorHandler transformation is automatically appended to the end of any transformation chain. +To disable this global behavior set the errorHandler to a noop: + + // Disable global error handling + option errorHandler = (tables=<-) => tables + +Using the `errors` transformation function specific error handling may be applied to a specific section of a Flux program. + + // Perform a simple downsample + from(bucket:"telegraf/autogen") + |> range(start: -5m) + |> aggregateWindow(every: 1m, fn: mean) + |> to(bucket: "telegraf/5m") + |> errors() + // Send all errors during downsampling to S3 + |> s3.to(bucket: "example.com/errors") + + ## Request and Response Formats Included with the specification of the language and execution model, is a specification of how to submit queries and read their responses over HTTP.