Skip to content

Commit 6a7cc9b

Browse files
author
Pat
authored
msgpack: examples for TCP and other info (fluent#707)
* msgpack: examples for TCP and other info Signed-off-by: Patrick Stephens <pat@calyptia.com> * msgpack: review comments Signed-off-by: Patrick Stephens <pat@calyptia.com>
1 parent 073e48e commit 6a7cc9b

File tree

3 files changed

+98
-20
lines changed

3 files changed

+98
-20
lines changed

concepts/key-concepts.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ Source events can have or not have a structure. A structure defines a set of _ke
103103
At a low level both are just an array of bytes, but the Structured message defines _keys_ and _values_, having a structure helps to implement faster operations on data modifications.
104104

105105
{% hint style="info" %}
106-
Fluent Bit **always** handles every Event message as a structured message. For performance reasons, we use a binary serialization data format called [MessagePack](https://msgpack.org/).
106+
Fluent Bit **always** handles every Event message as a structured message.
107+
For performance reasons, we use a binary serialization data format called [MessagePack](https://msgpack.org/).
107108

108109
Consider [MessagePack](https://msgpack.org/) as a binary version of JSON on steroids.
109110
{% endhint %}

development/msgpack-format.md

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Msgpack format
2+
3+
Fluent Bit **always** handles every Event message as a structured message using a binary serialization data format called [MessagePack](https://msgpack.org/).
4+
5+
## Fluent Bit usage
6+
7+
MessagePack is a standard and well-defined format, refer to the official documentation for full details.
8+
This section provides an overview of the specific types used by Fluent Bit within the format to help anyone consuming it.
9+
10+
* The data structure used by Fluent Bit is a 2-length [`fixarray`](https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family) of the timestamp and the data.
11+
* The timestamp comes from [`flb_time_append_to_msgpack`])(https://github.com/fluent/fluent-bit/blob/2138cee8f4878733956d42d82f6dcf95f0aa9339/src/flb_time.c#L197), so it’s either a `uint64`, a `float64`, or a [`fixext`](https://github.com/msgpack/msgpack/blob/master/spec.md#ext-format-family) where the 4 MSBs are the seconds (big-endian `uint32`) and 4 LSBs are nanoseconds.
12+
* The data itself is just a [`msgpack` map](https://github.com/msgpack/msgpack/blob/master/spec.md#map-format-family) with the keys as strings.
13+
14+
## Example
15+
16+
Set up Fluent Bit to send in `msgpack` format to a specific port.
17+
18+
```bash
19+
$ docker run --rm -it --network=host fluent/fluent-bit /fluent-bit/bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=msgpack -v
20+
21+
```
22+
23+
We could send this to stdout but as it is a serialized format you would end up with strange output.
24+
As an example we use the [Python msgpack library](https://msgpack.org/#languages) to deal with it:
25+
26+
```python
27+
#Python3
28+
import socket
29+
import msgpack
30+
31+
unpacker = msgpack.Unpacker(use_list=False, raw=False)
32+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
33+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
34+
s.bind(("127.0.0.1", 5170))
35+
s.listen(1)
36+
connection, address = s.accept()
37+
38+
while True:
39+
data = connection.recv(1024)
40+
if not data:
41+
break
42+
unpacker.feed(data)
43+
for unpacked in unpacker:
44+
print(unpacked)
45+
```
46+
47+
Now make sure to install `msgpack` for Python (3) and then run up your test code:
48+
49+
```bash
50+
$ pip install msgpack
51+
$ python3 test.py
52+
(ExtType(code=0, data=b'b\n5\xc65\x05\x14\xac'), {'cpu_p': 0.1875, 'user_p': 0.125, 'system_p': 0.0625, 'cpu0.p_cpu': 0.0, 'cpu0.p_user': 0.0, 'cpu0.p_system': 0.0, 'cpu1.p_cpu': 0.0, 'cpu1.p_user': 0.0, 'cpu1.p_system': 0.0, 'cpu2.p_cpu': 1.0, 'cpu2.p_user': 0.0, 'cpu2.p_system': 1.0, 'cpu3.p_cpu': 0.0, 'cpu3.p_user': 0.0, 'cpu3.p_system': 0.0, 'cpu4.p_cpu': 0.0, 'cpu4.p_user': 0.0, 'cpu4.p_system': 0.0, 'cpu5.p_cpu': 0.0, 'cpu5.p_user': 0.0, 'cpu5.p_system': 0.0, 'cpu6.p_cpu': 0.0, 'cpu6.p_user': 0.0, 'cpu6.p_system': 0.0, 'cpu7.p_cpu': 0.0, 'cpu7.p_user': 0.0, 'cpu7.p_system': 0.0, 'cpu8.p_cpu': 0.0, 'cpu8.p_user': 0.0, 'cpu8.p_system': 0.0, 'cpu9.p_cpu': 1.0, 'cpu9.p_user': 1.0, 'cpu9.p_system': 0.0, 'cpu10.p_cpu': 0.0, 'cpu10.p_user': 0.0, 'cpu10.p_system': 0.0, 'cpu11.p_cpu': 0.0, 'cpu11.p_user': 0.0, 'cpu11.p_system': 0.0, 'cpu12.p_cpu': 0.0, 'cpu12.p_user': 0.0, 'cpu12.p_system': 0.0, 'cpu13.p_cpu': 0.0, 'cpu13.p_user': 0.0, 'cpu13.p_system': 0.0, 'cpu14.p_cpu': 0.0, 'cpu14.p_user': 0.0, 'cpu14.p_system': 0.0, 'cpu15.p_cpu': 0.0, 'cpu15.p_user': 0.0, 'cpu15.p_system': 0.0})
53+
54+
```

pipeline/outputs/tcp-and-tls.md

+42-19
Original file line numberDiff line numberDiff line change
@@ -28,36 +28,59 @@ The following parameters are available to configure a secure channel connection
2828

2929
### Command Line
3030

31+
#### JSON format
32+
3133
```bash
3234
$ bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=json_lines -v
3335
```
3436

35-
We have specified to gather [CPU](https://github.com/fluent/fluent-bit-docs/tree/16f30161dc4c79d407cd9c586a0c6839d0969d97/pipeline/input/cpu.md) usage metrics and send them in JSON lines mode to a remote end-point using netcat service, e.g:
36-
37-
#### Start the TCP listener
37+
We have specified to gather [CPU](https://github.com/fluent/fluent-bit-docs/tree/16f30161dc4c79d407cd9c586a0c6839d0969d97/pipeline/input/cpu.md) usage metrics and send them in JSON lines mode to a remote end-point using netcat service.
3838

39-
Run the following in a separate terminal, netcat will start listening for messages on TCP port 5170
39+
Run the following in a separate terminal, `netcat` will start listening for messages on TCP port 5170.
40+
Once it connects to Fluent Bit ou should see the output as above in JSON format:
4041

41-
```text
42+
```bash
4243
$ nc -l 5170
44+
{"date":1644834856.905985,"cpu_p":1.1875,"user_p":0.5625,"system_p":0.625,"cpu0.p_cpu":0.0,"cpu0.p_user":0.0,"cpu0.p_system":0.0,"cpu1.p_cpu":1.0,"cpu1.p_user":1.0,"cpu1.p_system":0.0,"cpu2.p_cpu":4.0,"cpu2.p_user":2.0,"cpu2.p_system":2.0,"cpu3.p_cpu":1.0,"cpu3.p_user":0.0,"cpu3.p_system":1.0,"cpu4.p_cpu":1.0,"cpu4.p_user":0.0,"cpu4.p_system":1.0,"cpu5.p_cpu":1.0,"cpu5.p_user":1.0,"cpu5.p_system":0.0,"cpu6.p_cpu":0.0,"cpu6.p_user":0.0,"cpu6.p_system":0.0,"cpu7.p_cpu":3.0,"cpu7.p_user":1.0,"cpu7.p_system":2.0,"cpu8.p_cpu":0.0,"cpu8.p_user":0.0,"cpu8.p_system":0.0,"cpu9.p_cpu":1.0,"cpu9.p_user":0.0,"cpu9.p_system":1.0,"cpu10.p_cpu":1.0,"cpu10.p_user":0.0,"cpu10.p_system":1.0,"cpu11.p_cpu":0.0,"cpu11.p_user":0.0,"cpu11.p_system":0.0,"cpu12.p_cpu":0.0,"cpu12.p_user":0.0,"cpu12.p_system":0.0,"cpu13.p_cpu":3.0,"cpu13.p_user":2.0,"cpu13.p_system":1.0,"cpu14.p_cpu":1.0,"cpu14.p_user":1.0,"cpu14.p_system":0.0,"cpu15.p_cpu":0.0,"cpu15.p_user":0.0,"cpu15.p_system":0.0}
4345
```
4446

45-
Start Fluent Bit
47+
#### Msgpack format
48+
49+
Repeat the JSON approach but using the `msgpack` output format.
4650

4751
```bash
48-
$ bin/fluent-bit -i cpu -o stdout -p format=msgpack -v
49-
Fluent Bit v1.x.x
50-
* Copyright (C) 2019-2020 The Fluent Bit Authors
51-
* Copyright (C) 2015-2018 Treasure Data
52-
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
53-
* https://fluentbit.io
54-
55-
[2016/10/07 21:52:01] [ info] [engine] started
56-
[0] cpu.0: [1475898721, {"cpu_p"=>0.500000, "user_p"=>0.250000, "system_p"=>0.250000, "cpu0.p_cpu"=>0.000000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>0.000000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>0.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>1.000000}]
57-
[1] cpu.0: [1475898722, {"cpu_p"=>0.250000, "user_p"=>0.250000, "system_p"=>0.000000, "cpu0.p_cpu"=>0.000000, "cpu0.p_user"=>0.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>1.000000, "cpu1.p_user"=>1.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>0.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>0.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>0.000000}]
58-
[2] cpu.0: [1475898723, {"cpu_p"=>0.750000, "user_p"=>0.250000, "system_p"=>0.500000, "cpu0.p_cpu"=>2.000000, "cpu0.p_user"=>1.000000, "cpu0.p_system"=>1.000000, "cpu1.p_cpu"=>0.000000, "cpu1.p_user"=>0.000000, "cpu1.p_system"=>0.000000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>0.000000, "cpu2.p_system"=>1.000000, "cpu3.p_cpu"=>0.000000, "cpu3.p_user"=>0.000000, "cpu3.p_system"=>0.000000}]
59-
[3] cpu.0: [1475898724, {"cpu_p"=>1.000000, "user_p"=>0.750000, "system_p"=>0.250000, "cpu0.p_cpu"=>1.000000, "cpu0.p_user"=>1.000000, "cpu0.p_system"=>0.000000, "cpu1.p_cpu"=>2.000000, "cpu1.p_user"=>1.000000, "cpu1.p_system"=>1.000000, "cpu2.p_cpu"=>1.000000, "cpu2.p_user"=>1.000000, "cpu2.p_system"=>0.000000, "cpu3.p_cpu"=>1.000000, "cpu3.p_user"=>1.000000, "cpu3.p_system"=>0.000000}]
52+
$ bin/fluent-bit -i cpu -o tcp://127.0.0.1:5170 -p format=msgpack -v
53+
6054
```
6155

62-
No more, no less, it just works.
56+
We could send this to stdout but as it is a serialized format you would end up with strange output.
57+
This should really be handled by a msgpack receiver to unpack as per the details in the developer documentation [here](../../development/msgpack-format.md).
58+
As an example we use the [Python msgpack library](https://msgpack.org/#languages) to deal with it:
59+
60+
```python
61+
#Python3
62+
import socket
63+
import msgpack
64+
65+
unpacker = msgpack.Unpacker(use_list=False, raw=False)
66+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
67+
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
68+
s.bind(("127.0.0.1", 5170))
69+
s.listen(1)
70+
connection, address = s.accept()
71+
72+
while True:
73+
data = connection.recv(1024)
74+
if not data:
75+
break
76+
unpacker.feed(data)
77+
for unpacked in unpacker:
78+
print(unpacked)
79+
```
80+
81+
```bash
82+
$ pip install msgpack
83+
$ python3 test.py
84+
(ExtType(code=0, data=b'b\n5\xc65\x05\x14\xac'), {'cpu_p': 0.1875, 'user_p': 0.125, 'system_p': 0.0625, 'cpu0.p_cpu': 0.0, 'cpu0.p_user': 0.0, 'cpu0.p_system': 0.0, 'cpu1.p_cpu': 0.0, 'cpu1.p_user': 0.0, 'cpu1.p_system': 0.0, 'cpu2.p_cpu': 1.0, 'cpu2.p_user': 0.0, 'cpu2.p_system': 1.0, 'cpu3.p_cpu': 0.0, 'cpu3.p_user': 0.0, 'cpu3.p_system': 0.0, 'cpu4.p_cpu': 0.0, 'cpu4.p_user': 0.0, 'cpu4.p_system': 0.0, 'cpu5.p_cpu': 0.0, 'cpu5.p_user': 0.0, 'cpu5.p_system': 0.0, 'cpu6.p_cpu': 0.0, 'cpu6.p_user': 0.0, 'cpu6.p_system': 0.0, 'cpu7.p_cpu': 0.0, 'cpu7.p_user': 0.0, 'cpu7.p_system': 0.0, 'cpu8.p_cpu': 0.0, 'cpu8.p_user': 0.0, 'cpu8.p_system': 0.0, 'cpu9.p_cpu': 1.0, 'cpu9.p_user': 1.0, 'cpu9.p_system': 0.0, 'cpu10.p_cpu': 0.0, 'cpu10.p_user': 0.0, 'cpu10.p_system': 0.0, 'cpu11.p_cpu': 0.0, 'cpu11.p_user': 0.0, 'cpu11.p_system': 0.0, 'cpu12.p_cpu': 0.0, 'cpu12.p_user': 0.0, 'cpu12.p_system': 0.0, 'cpu13.p_cpu': 0.0, 'cpu13.p_user': 0.0, 'cpu13.p_system': 0.0, 'cpu14.p_cpu': 0.0, 'cpu14.p_user': 0.0, 'cpu14.p_system': 0.0, 'cpu15.p_cpu': 0.0, 'cpu15.p_user': 0.0, 'cpu15.p_system': 0.0})
6385

86+
```

0 commit comments

Comments
 (0)