Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACKs with fluent-bit #181

Closed
elruwen opened this issue Jan 28, 2021 · 8 comments
Closed

ACKs with fluent-bit #181

elruwen opened this issue Jan 28, 2021 · 8 comments
Assignees

Comments

@elruwen
Copy link
Contributor

elruwen commented Jan 28, 2021

Hi!

I am trying to use fluency with ACKs and fluent-bit, but that doesn't work. I digged a bit around and it looks like fluency is encoding the chunk uuid as binary and not as a string.

It fails in this line: https://github.com/fluent/fluent-bit/blob/master/plugins/in_forward/fw_prot.c#L161 The type send to fluent-bit is binary, but if I understand the specs correctly, it should be string https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#entry. Currently the value we get is 0x08, which is binary: https://github.com/msgpack/msgpack-c/blob/c_master/include/msgpack/object.h#L41

Not sure if fluency is also supporting fluent-bit...

Relevant dependencies I use:

[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.8:compile
[INFO] |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.8:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.9.8:compile
[INFO] +- org.komamitsu:fluency-core:jar:2.5.0:compile
[INFO] |  +- org.msgpack:jackson-dataformat-msgpack:jar:0.8.21:runtime
[INFO] |  |  \- org.msgpack:msgpack-core:jar:0.8.21:runtime
[INFO] |  +- org.komamitsu:phi-accural-failure-detector:jar:0.0.5:runtime
[INFO] |  \- net.jodah:failsafe:jar:2.4.0:runtime

To reproduce, the following fluent-bit config is enough:

[INPUT]
    Name forward
    Listen 0.0.0.0
    Port 24224

[OUTPUT]
    name stdout
    match *
    format json_lines

Code:

        FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
        builder.setAckResponseMode(true);
        Fluency fluency = builder

                .build("localhost", 24224);
        EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());
        fluency.emit(
                "testtag", eventTime,
                Collections.singletonMap("log", "test"));
        fluency.close();

Cheers
Ruwen

@komamitsu komamitsu self-assigned this Jan 31, 2021
@komamitsu
Copy link
Owner

Thanks for reporting it. I'll try to reproduce this issue.

@elruwen
Copy link
Contributor Author

elruwen commented Feb 1, 2021

I had a look at the code:

  • I would make the passed around ack token a String (in org.komamitsu.fluency.fluentd.ingester.sender.FluentdSender#sendInternal).
  • In org.komamitsu.fluency.fluentd.ingester.sender.RequestOption I would change the type to String
  • In org.komamitsu.fluency.fluentd.ingester.FluentdIngester#ingest I would change the chunk generation to
            UUID uuid = UUID.randomUUID();
            ByteBuffer uuidByteBuffer = ByteBuffer.wrap(new byte[16]);
            uuidByteBuffer.putLong(uuid.getMostSignificantBits());
            uuidByteBuffer.putLong(uuid.getLeastSignificantBits());
            byte[] uuidBytes = uuidByteBuffer.array();
            String uuidBase64Encoded = Base64.getEncoder().encodeToString(uuidBytes);

            ByteBuffer optionBuffer = ByteBuffer.wrap(objectMapper.writeValueAsBytes(new RequestOption(dataLength, uuidBase64Encoded)));
            List<ByteBuffer> buffers = Arrays.asList(headerBuffer, dataBuffer, optionBuffer);

            synchronized (sender) {
                sender.sendWithAck(buffers, uuidBase64Encoded);
            }
  • The String always holds the base64 encoded token. And when the ack token is received from fluentd, I would just compare the base64 encoded version (I would not convert it back to a UUID, only when required for the UnmatchedAckException exception.

I am happy to put up a PR with those changes if you agree that we got an issue and with my solution proposal ;)

@komamitsu
Copy link
Owner

@elruwen Your plan looks good! Thanks.

Let me add a comment on the plan.

This issue happens because fluent-bit expects ACK token called chunk is MessagePack String not Binary at https://github.com/fluent/fluent-bit/blob/bb0c95b4d846dc90b9e4072f081c7fff9a2bdad6/plugins/in_forward/fw_prot.c#L162 and doesn't validate the value itself. So, I don't think Fluency needs to send Base64 format value for now.

Also strictly speaking, in fluent-bit the source of chunk is a SHA-512 of the data sent to in_forward https://github.com/fluent/fluent-bit/blob/bb0c95b4d846dc90b9e4072f081c7fff9a2bdad6/plugins/out_forward/forward_format.c#L96. The 3rd change item you suggested is a bit incomplete and I think we can skip the change since sending Base64 format chunk isn't required for now.

elruwen added a commit to elruwen/fluency that referenced this issue Feb 4, 2021
elruwen added a commit to elruwen/fluency that referenced this issue Feb 4, 2021
elruwen added a commit to elruwen/fluency that referenced this issue Feb 4, 2021
elruwen added a commit to elruwen/fluency that referenced this issue Feb 4, 2021
elruwen added a commit to elruwen/fluency that referenced this issue Feb 11, 2021
komamitsu added a commit that referenced this issue Feb 11, 2021
fix-#181: Send the ack token as String
@komamitsu
Copy link
Owner

This issue is fixed by #182

@elruwen
Copy link
Contributor Author

elruwen commented Feb 12, 2021

This issue is fixed by #182

Thanks for merging it, can we get a release? :)

@komamitsu
Copy link
Owner

I'll release the fix in a few days.

@komamitsu
Copy link
Owner

Released this change as 2.5.1. It'll be visible in a few hours on Maven Central.

@elruwen
Copy link
Contributor Author

elruwen commented Feb 15, 2021

Awesome, thanks for the quick turn-around :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants