You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This article goes through very specific and simple steps to learn how Stream Processor works. For simplicity it uses a custom Docker image that contains the relevant components for testing.
3
+
Follow this tutorial to learn more about stream processing.
4
4
5
5
## Requirements
6
6
7
-
The following tutorial requires the following software components:
7
+
This tutorial requires the following components:
8
8
9
-
*[Fluent Bit](https://fluentbit.io)>= v1.2.0
10
-
*[Docker Engine](https://www.docker.com/products/docker-engine)\(not mandatory if you already have Fluent Bit binary installed in your system\)
* A stream processing [sample file](https://raw.githubusercontent.com/fluent/fluent-bit-docs/37b477786d6e28eb223e08611c26ec93671a34ac/stream-processing/samples/sp-samples-1k.log)
11
12
12
-
In addition download the following data [sample file](https://raw.githubusercontent.com/fluent/fluent-bit-docs/37b477786d6e28eb223e08611c26ec93671a34ac/stream-processing/samples/sp-samples-1k.log)\(130KB\).
13
+
## Steps
13
14
14
-
## Stream Processing using the command line
15
-
16
-
For all next steps we will run Fluent Bit from the command line, and for simplicity we will use the official Docker image.
15
+
These steps use the official Fluent Bit Docker image.
17
16
18
17
### 1. Fluent Bit version
19
18
19
+
Run the following command to confirm that Fluent Bit is installed and up-to-date:
20
+
20
21
```bash
21
22
$ docker run -ti fluent/fluent-bit:1.4 /fluent-bit/bin/fluent-bit --version
22
23
Fluent Bit v1.8.2
23
24
```
24
25
25
26
### 2. Parse sample files
26
27
27
-
The samples file contains JSON records. On this command, we are appending the Parsers configuration file and instructing _tail_input plugin to parse the content as _json_:
28
+
The sample file contains JSON records. Run the following command to append the `parsers.conf`file and instruct the Tail input plugin to parse content as JSON:
28
29
29
30
```bash
30
31
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
The command above will simply print the parsed content to the standard output interface. The content will print the _Tag_associated to each record and an array with two fields: record timestamp and record map:
40
+
This command prints the parsed content to the standard output interface. The parsed content includes a tag associated with each record and an array with two fields: a timestamp and a record map:
As of now there is no Stream Processing, on step \#3 we will start doing some basic queries.
62
-
63
-
### 3. Selecting specific record keys
62
+
### 3. Select specific record keys
64
63
65
-
This command introduces a Stream Processor \(SP\)query through the **-T** option and changes the output plugin to _null_, this is done with the purpose of obtaining the SP results in the standard output interface and avoid confusions in the terminal.
64
+
Run the following command to create a stream processor query using the `-T` flag and change the output to the Null plugin. This obtains the stream processing results in the standard output interface and avoids confusion in the terminal.
66
65
67
66
```bash
68
67
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
The query above aims to retrieve all records that a key named _country_ value matches the value _Chile_, and for each match compose and output a record using only the key fields _word_and _num_:
79
+
The previous query aims to retrieve all records for which the `country` key contains the value `Chile`. For each match, it composes and outputs a record that only contains the keys `word`and `num`:
The following query is similar to the one in the previous step, but this time we will use the aggregation function called AVG\(\)to get the average value of the records ingested:
91
+
Run the following command to use the `AVG`aggregation function to get the average value of ingested records:
93
92
94
93
```bash
95
94
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
why did we get multiple records? Answer: When Fluent Bit processes the data, records come in chunks and the Stream Processor runs the process over chunks of data, so the input plugin ingested 5 chunks of records and SP processed the query for each chunk independently. To process multiple chunks at once we have to group results during windows of time.
116
+
{% hint style="info" %}
117
+
The resulting output contains multiple records because Fluent Bit processes data in chunks, and the stream processor processes each chunk independently. To process multiple chunks at the same time, you can group results using time windows.
118
+
{% endhint %}
118
119
119
-
### 5. Grouping Results and Window
120
+
### 5. Group results and windows
120
121
121
-
Grouping results aims to simplify data processing and when used in a defined window of time we can achieve great things. The next query group the results by _country_ and calculate the average of _num_ value, the processing window is 1 second which basically means: process all incoming chunks coming within 1 second window:
122
+
Grouping results within a time window simplifies data processing. Run the following command to group results by `country` and calculate the average of `num` with a one-second processing window:
122
123
123
124
```bash
124
125
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
### 6. Ingest Stream Processor results as new Stream of Data
146
+
### 6. Ingest stream processor results as a new stream of data
146
147
147
-
Now we see a more real-world use case. Sending data results to the standard output interface is good for learning purposes, but now we will instruct the Stream Processor to ingest results as part of Fluent Bit data pipeline and attach a Tag to them.
148
+
Next, instruct the stream processor to ingest results as part of the Fluent Bit data pipeline and assign a tag to each record.
148
149
149
-
This can be done using the **CREATE STREAM** statement that will also tag results with **sp-results** value. Note that output plugin parameter is now _stdout_ matching all records tagged with _sp-results_:
150
+
Run the following command, which uses a `CREATE STREAM` statement to tag results with the `sp-results` tag, then outputs records with that tag to standard output:
150
151
151
152
```bash
152
153
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
Fluent Bit have the notion of streams, and every input plugin instance gets a default name. You can override that behavior by setting an alias. Check the **alias** parameter and new **stream** name in the following example:
180
-
181
-
```bash
182
-
$ docker run -ti -v `pwd`/sp-samples-1k.log:/sp-samples-1k.log \
183
-
fluent/fluent-bit:1.8.2 \
184
-
/fluent-bit/bin/fluent-bit \
185
-
-R /fluent-bit/etc/parsers.conf \
186
-
-i tail \
187
-
-p path=/sp-samples-1k.log \
188
-
-p parser=json \
189
-
-p read_from_head=true \
190
-
-p alias=samples \
191
-
-T "CREATE STREAM results WITH (tag='sp-results') \
0 commit comments