Skip to content

Commit

Permalink
update(site): update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Aug 12, 2020
1 parent f479ab7 commit 70151e9
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 80 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
date: 2020-05-21
date: 2020-08-12
title: "Accessing Data and Metadata"
linkTitle: "Accessing Data and Metadata"
weight: 60
Expand All @@ -11,56 +11,78 @@ Some filters (e.g : [AppendFilter](#appendfilter)) can be configured using *Simp

*Simple Connect Expression Language* (ScEL for short) is an expression language based on regex that allows quick access and manipulating record fields and metadata.

The syntax to define an expression is of the form : "`{{ <expression string> }}`".

{{% alert title="Note" color="info" %}}
In some situation double brackets can be omitted if the expression is used to write a value into a target field.
{{% /alert %}}
The synthaxes to define an expression are of the form : `<expression string>` or `"{{ <expression string> }}"`.

ScEL supports the following capabilities :

* **Literal expressions**
* **Field Selector**
* **Nested Navigation**
* **String substitution**
* **Functions**

## Literal expressions

* String : `'Hello World'`
* Number : `42`
* Boolean: `True`
* Nullable: `null`

## Field Selector

The expression language can be used to easily select one field from the input record :

"`{{ username }}`"
`$.username`

## Nested Navigation

To navigate down a struct value, just use a period to indicate a nested field value :

"`{{ address.city }}`"
`$.address.city`

## String substitution

The expression language can be used to easily build a new string field that concatenate multiple ones :

"`{{ <expression one> }}-{{ <expression two>}}`"
`The user {{ $.username }} is living in city {{ $.address.city }}`

## Function

The expression language support function call :

`The user {{ $.username }} is living in city {{ uppercase($.address.city) }}`

## Dynamic Field Selector

String substitution can be used to dynamically select a field :

The bellow example shows how to dynamically build a field selector by concatenating `$.` and
the first element present in the array field `$.values`.

`{{ '$.'extract_array($.values, 0) }}`


Note the used of double-quotes to define a substitution expressions

## Built-in Functions

ScEL supports a number of predefined functions that can be used to apply a single transformation on a field.

| Function | Description | Syntax |
| ---------------| --------------|-----------|
| `contains` | Returns `true` if an array field's value contains the specified value | `{{ contains(array, value) }}` |
| `contains` | Returns `true` if an array field's value contains the specified value | `{{ contains(array, 'value') }}` |
| `converts` | Converts a field'value into the specified type | `{{ converts(field, INTEGER) }}` |
| `ends_with` | Returns `true` if an a string field's value end with the specified string suffix | `{{ ends_with(field, suffix) }}` |
| `ends_with` | Returns `true` if an a string field's value end with the specified string suffix | `{{ ends_with(field, 'suffix') }}` |
| `equals` | Returns `true` if an a string or number fields's value equals the specified value | `{{ equals(field, value) }}` |
| `exists` | Returns `true` if an the specified field exists | `{{ ends_with(field, value) }}` |
| `extract_array`| Returns the element at the specified position of the specified array | `{{extract_array(array, 0) }}` |
| `is_null` | Returns `true` if a field's value is null | `{{ is_null(field) }}` |
| `length` | Returns the number of elements into an array of the length of an string field | `{{ length(array) }}` |
| `lowercase` | Converts all of the characters in a string field's value to lower case | `{{ lowercase(field) }}` |
| `matches` | Returns `true` if a field's value match the specified regex | `{{ matches(field, regex) }}` |
| `matches` | Returns `true` if a field's value match the specified regex | `{{ matches(field, 'regex') }}` |
| `nlv` | Sets a default value if a field's value is null | `{{ length(array) }}` |
| `replace_all ` | Replaces every subsequence of the field's value that matches the given pattern with the given replacement string. | `{{ replace_all(field, regex, replacement) }}` |
| `starts_with` | Returns `true` if an a string field's value start with the specified string prefix | `{{ starts_with(field, prefix) }}` |
| `replace_all ` | Replaces every subsequence of the field's value that matches the given pattern with the given replacement string. | `{{ replace_all(field, 'regex', 'replacement') }}` |
| `starts_with` | Returns `true` if an a string field's value start with the specified string prefix | `{{ starts_with(field, 'prefix') }}` |
| `trim` | Trims the spaces from the beginning and end of a string. | `{{ trim(field) }}` |
| `uppercase` | Converts all of the characters in a string field's value to upper case | `{{ uppercase(field) }}` |

Expand All @@ -70,7 +92,7 @@ In addition, ScEL supports nested functions.
For example, the following expression is used to replace all whitespace characters after transforming our field's value into lowercase.

```
{{ replace_all(lowercase(field), \\s, -)}}
replace_all(lowercase($.field), '\\s', '-')
```

{{% alert title="Limitation" color="warning" %}}
Expand All @@ -80,37 +102,36 @@ Currently, FilePulse does not support user-defined functions (UDFs). So you cann

## Scopes


In previous section, we have shown how to use the expression language to select a specific field.
The selected field was part of our the current record being processed.

Actually, ScEL allows you to get access to additional fields through the used of scopes.
Basically, a scope defined the root object on which a selector expression must evaluated.

The syntax to define an expression with a scope is of the form : "`{{ $<scope>.<selector expression string> }}`".
The syntax to define an expression with a scope is of the form : "`$<scope>.<selector expression string>`".

By default, if no scope is defined in the expression, the scope `$value` is implicitly used.

ScEL supports a number of predefined scopes that can be used for example :

- **To override the output topic.**
- **To define record the key to be used.**
- **To get access to the source file metadata.**
- **To define the topic for the record.**
- **To define the key for the record.**
- **To get access to metadata about the source file.**
- Etc.

| Scope | Description | Type |
|--- | --- |--- |
| `{{ $headers }}` | The record headers | - |
| `{{ $key }}` | The record key | `string` |
| `{{ $metadata }}` | The file metadata | `struct` |
| `{{ $offset }}` | The offset information of this record into the source file | `struct` |
| `{{ $system }}` | The system environment variables and runtime properties | `struct` |
| `{{ $timestamp }}` | The record timestamp | `long` |
| `{{ $topic }}` | The output topic | `string` |
| `{{ $value }}` | The record value| `struct` |
| `{{ $variables }}` | The contextual filter-chain variables| `map[string, object]` |

Note, that in case of failures more fields are added to the current filter context (see : [Handling Failures](./handling-failures)
| `$headers` | The record headers | - |
| `$key` | The record key | `string` |
| `$metadata` | The file metadata | `struct` |
| `$offset` | The offset information of this record into the source file | `struct` |
| `$system` | The system environment variables and runtime properties | `struct` |
| `$timestamp` | The record timestamp | `long` |
| `$topic` | The output topic | `string` |
| `$value` | The record value| `struct` |
| `$variables` | The contextual filter-chain variables| `map[string, object]` |

Note, that in case of failures more fields are added to the current filter context (see : [Handling Failures](/kafka-connect-file-pulse/docs/developer-guide/handling-failures/))

### Record Headers

Expand All @@ -126,13 +147,13 @@ The scope `metadata` allows read access to information about the file being proc

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $metadata.name }}` | The file name | `string` |
| `{{ $metadata.path }}` | The file directory path | `string` |
| `{{ $metadata.absolutePath }}` | The file absolute path | `string` |
| `{{ $metadata.hash }}` | The file CRC32 hash | `int` |
| `{{ $metadata.lastModified }}` | The file last modified time. | `long` |
| `{{ $metadata.size }}` | The file size | `long` |
| `{{ $metadata.inode }}` | The file Unix inode | `long` |
| `$metadata.name` | The file name | `string` |
| `$metadata.path` | The file directory path | `string` |
| `$metadata.absolutePath` | The file absolute path | `string` |
| `$metadata.hash` | The file CRC32 hash | `int` |
| `$metadata.lastModified` | The file last modified time. | `long` |
| `$metadata.size` | The file size | `long` |
| `$metadata.inode` | The file Unix inode | `long` |

## Record Offset

Expand All @@ -141,56 +162,56 @@ The available fields depend of the configured FileInputRecord.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.timestamp }}` | The creation time of the record (millisecond) | `long` |
| `$offset.timestamp` | The creation time of the record (millisecond) | `long` |

Information only available if `RowFilterReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.startPosition }}` | The start position of the record into the source file | `long` |
| `{{ $offset.endPosition }}` | The end position of the record into the source file | `long` |
| `{{ $offset.size }}` | The size in bytes | `long` |
| `{{ $offset.row }}` | The row number of the record into the source | `long` |
| `$offset.startPosition` | The start position of the record into the source file | `long` |
| `$offset.endPosition` | The end position of the record into the source file | `long` |
| `$offset.size` | The size in bytes | `long` |
| `$offset.row` | The row number of the record into the source | `long` |

Information only available if `BytesArrayInputReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.startPosition }}` | The start position of the record into the source file (always equals to 0) | `long` |
| `{{ $offset.endPosition }}` | The end position of the record into the source file (equals to the file size) | `long` |
| `$offset.startPosition` | The start position of the record into the source file (always equals to 0) | `long` |
| `$offset.endPosition` | The end position of the record into the source file (equals to the file size) | `long` |

Information only available if `AvroFilterInputReader` is configured.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $offset.blockStart }}` | The start position of the current block | `long` |
| `{{ $offset.position }}` | The position into the current block. | `long` |
| `{{ $offset.records }}` | The number of record read into the current block. | `long` |
| `$offset.blockStart` | The start position of the current block | `long` |
| `$offset.position` | The position into the current block. | `long` |
| `$offset.records` | The number of record read into the current block. | `long` |

## System

The scope `system` allows read access to system environment variables and runtime properties.

| Predefined Fields (ScEL) | Description | Type |
|--- | --- |--- |
| `{{ $system.env }}` | The system environment variables. | `map[string, string]` |
| `{{ $system.props }}` | The system environment properties. | `map[string, string]` |
| `$system.env` | The system environment variables. | `map[string, string]` |
| `$system.props` | The system environment properties. | `map[string, string]` |

## Timestamp

The scope `timestamp` allows to defined the timestamp of the output record.
The scope `$timestamp` allows to defined the timestamp of the output record.

## Topic

The scope `topic` allows to defined the target topic of the output record.
The scope `$topic` allows to defined the target topic of the output record.

## Value

The scope `value` allows to defined the fields of the output record
The scope `$value` allows to defined the fields of the output record

## Variables

The scope `variables` allows read/write access to a simple key-value map structure.
This scope can be used to share user-defined variables between filters.
The scope `$variables` allows read/write access to a simple key-value map structure.
This scope can be used to share user-defined variables between [Processing Filters](/kafka-connect-file-pulse/docs/developer-guide/filters/).

Note : variables are not cached between records.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
date: 2020-05-21
date: 2020-08-12
title: "File Readers"
linkTitle: "File Readers"
weight: 40
Expand Down Expand Up @@ -33,4 +33,10 @@ The following provides usage information for `io.streamthoughts.kafka.connect.fi

The `XMLFileInputReader` is used to read XML files.

The following provides usage information for `io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader` ([source code](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/XMLFileInputReader.java))
The following provides usage information for `io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader` ([source code](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/XMLFileInputReader.java))

## FileInputMetadataReader

The `FileInputMetadataReader` is used to send a single record per file containing metadata (i.e: `name`, `path`, `hash`, `lastModified`, `size`, etc)

The following provides usage information for `io.streamthoughts.kafka.connect.filepulse.reader.FileInputMetadataReader` ([source code](https://github.com/streamthoughts/kafka-connect-file-pulse/blob/master/connect-file-pulse-plugin/src/main/java/io/streamthoughts/kafka/connect/filepulse/reader/FileInputMetadataReader.java))
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ filters.GroupMultilineException.negate=false
filters.GroupMultilineException.pattern="^[\\t]"
filters.ExtractFirstLine.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.ExtractFirstLine.field=logmessage
filters.ExtractFirstLine.values=${extractarray(message,0)}
filters.ExtractFirstLine.field=$.logmessage
filters.ExtractFirstLine.values={{ extract_array($.message, 0) }
filters.ParseLog4jLog.type=io.streamthoughts.kafka.connect.filepulse.filter.impl.GrokFilter
filters.ParseLog4jLog.match="%{TIMESTAMP_ISO8601:logdate} %{LOGLEVEL:loglevel} %{GREEDYDATA:thread} %{GREEDYDATA:logmessage}"
Expand Down
20 changes: 10 additions & 10 deletions site/content/en/docs/Archives/v1.4.x/Developer Guide/filters.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
date: 2020-08-06
date: 2020-08-12
title: "Processing Filters"
linkTitle: "Processing Filters"
weight: 80
Expand All @@ -10,7 +10,7 @@ description: >
These filters are available for use with Kafka Connect File Pulse:

| Filter | Description | Since
--- | --- | --- |
|--- | --- | --- |
| [AppendFilter](#appendfilter) | Appends one or more values to an existing or non-existing array field | |
| [ConvertFilter](#convertfilter) | Converts a message field's value to a specific type | |
| [DateFilter](#datefilter) | Converts a field's value containing a date to a unix epoch time | |
Expand Down Expand Up @@ -51,7 +51,7 @@ The concat value is then added to the a field named `result`.
**Configuration**
```properties
filters.SubstituteFilter.field="result"
filters.SubstituteFilter.value="{{ extract_array(values,0) }}-{{ extract_array(values,1) }}"
filters.SubstituteFilter.value="{{ extract_array($.values,0) }}-{{ extract_array($.values,1) }}"
```

**Input**
Expand Down Expand Up @@ -84,7 +84,7 @@ The following configuration show how to use the `$topic` scope :

```properties
filters.SubstituteFilter.field="$topic"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array(values,0) }}"
filters.SubstituteFilter.value="my-topic-{{ lowercase(extract_array($.values,0)) }}"
```
**Input**
```json
Expand All @@ -111,8 +111,8 @@ This allows to dynamically determine the name of the field to be added.
The following examples show how to use a property expression to get the named of the field from a

```properties
filters.SubstituteFilter.field="{{ target }}"
filters.SubstituteFilter.value="{{ extract_array(values,0) }}-{{ extract_array(values,1) }}"
filters.SubstituteFilter.field="$.target"
filters.SubstituteFilter.value="{{ extract_array($.values, 0) }}-{{ extract_array($.values,1) }}"
```

**Input**
Expand Down Expand Up @@ -208,8 +208,8 @@ The `DateFilter` converts a field's value containing a date to a unix epoch time
### Examples
```properties
filters.MyDateFilter.field="date"
filters.MyDateFilter.target="timestamp"
filters.MyDateFilter.field="$.date"
filters.MyDateFilter.target="$.timestamp"
filters.MyDateFilter.format="yyyy-MM-dd'T'HH:mm:ss"
```

Expand Down Expand Up @@ -289,7 +289,7 @@ The following example shows the usage of **DropFilter** to only keep records wit
```properties
filters=Drop
filters.Drop.type=io.streamthoughts.kafka.connect.filepulse.filter.DropFilter
filters.Drop.if={{ equals(level, ERROR) }}
filters.Drop.if={{ equals($.level, 'ERROR') }}
filters.Drop.invert=true
```

Expand Down Expand Up @@ -384,7 +384,7 @@ The following example shows the usage of **FailFilter** to stop processing a fil
```properties
filters=Fail
filters.Fail.type=io.streamthoughts.kafka.connect.filepulse.filter.FailFilter
filters.Fail.if={{ is_null(user_id) }}
filters.Fail.if={{ is_null($.user_id) }}
filters.Fail.message=Invalid row, user_id is missing : {{ $value }}
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
date: 2020-05-21
date: 2020-08-12
title: "Handling Failures"
linkTitle: "Handling Failures"
weight: 70
Expand Down Expand Up @@ -53,8 +53,8 @@ Within an error filter chain, some additional fields are available to each filte

| Predefined Fields / ScEL | Description | Type |
|--- | --- |--- |
| `{% raw %}{{ $error.message }}{% endraw %}` | The error message | `string` |
| `{% raw %}{{ $error.filter }}{% endraw %}` | The failed filter name | `string` |
| `$error.message` | The error message | `string` |
| `$error.filter` | The failed filter name | `string` |

### Example

Expand All @@ -71,6 +71,5 @@ filters.Log4jGrokFilter.withOnFailure=AppendError
filters.AppendError.type=io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter
filters.AppendError.field=errorMessage
filters.AppendError.value="{% raw %}{{ $error.message }}{% endraw %}"
filters.AppendError.value="$error.message"
```
Loading

0 comments on commit 70151e9

Please sign in to comment.