Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mqtt): add MQTT sink #16065

Closed

Conversation

zamazan4ik
Copy link
Contributor

Continues work from #13587

@neuronull I suggest continuing work on the MQTT sink here since the original author is not against it.

I have tried to fix your comments (at least that I am able to fix). Feel free to continue your review.

astro and others added 17 commits September 12, 2022 00:39
Co-authored-by: Kyle Criddle <kyle.criddle@datadoghq.com>
Co-authored-by: Kyle Criddle <kyle.criddle@datadoghq.com>
Co-authored-by: Kyle Criddle <kyle.criddle@datadoghq.com>
- update to the newer metrics infrastructure
- update rumqttc version
- fix compilation errors
- fix formatting

Tested:
- Local build
- add quality_of_service option
- add mqtt.org link
- update documentation

Tested:
- Local build
@netlify
Copy link

netlify bot commented Jan 22, 2023

Deploy Preview for vrl-playground canceled.

Name Link
🔨 Latest commit 272ef31
🔍 Latest deploy log https://app.netlify.com/sites/vrl-playground/deploys/63e41d457f6c3b000974e1a9

@netlify
Copy link

netlify bot commented Jan 22, 2023

Deploy Preview for vector-project canceled.

Name Link
🔨 Latest commit 272ef31
🔍 Latest deploy log https://app.netlify.com/sites/vector-project/deploys/63e41d45ce271c0008d7294f

@github-actions github-actions bot added domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks labels Jan 22, 2023
@github-actions
Copy link

Regression Test Results

Run ID: 28afd420-80cc-47d2-850e-9ef7a085578f
Baseline: eeb2bb5
Comparison: cfa1d1c
Total vector CPUs: 7

Explanation

A regression test is an integrated performance test for vector in a repeatable rig, with varying configuration for vector. What follows is a statistical summary of a brief vector run for each configuration across SHAs given above. The goal of these tests are to determine, quickly, if vector performance is changed and to what degree by a pull request. Where appropriate units are scaled per-core.

The table below, if present, lists those experiments that have experienced a statistically significant change in their bytes_written_per_cpu_second performance between baseline and comparison SHAs, with 90.0% confidence OR have been detected as newly erratic. Negative values mean that baseline is faster, positive comparison. Results that do not exhibit more than a ±5% change in mean bytes_written_per_cpu_second are discarded. An experiment is erratic if its coefficient of variation is greater than 0.1. The abbreviated table will be omitted if no interesting changes are observed.

Changes in bytes_written_per_cpu_second with confidence ≥ 90.00% and absolute Δ mean >= ±5%:

experiment Δ mean Δ mean % confidence
datadog_agent_remap_datadog_logs_acks 3.27MiB/CPU-s 10.99 100.00%
Fine details of change detection per experiment.
experiment Δ mean Δ mean % confidence baseline mean baseline stdev baseline stderr baseline outlier % baseline CoV comparison mean comparison stdev comparison stderr comparison outlier % comparison CoV erratic declared erratic
datadog_agent_remap_datadog_logs_acks 3.27MiB/CPU-s 10.99 100.00% 29.72MiB/CPU-s 8.92MiB/CPU-s 117.8KiB/CPU-s 0.0 0.300102 32.98MiB/CPU-s 1.24MiB/CPU-s 16.32KiB/CPU-s 0.0 0.037444 True False
syslog_splunk_hec_logs 255.8KiB/CPU-s 2.85 100.00% 8.76MiB/CPU-s 401.25KiB/CPU-s 5.18KiB/CPU-s 0.0 0.044712 9.01MiB/CPU-s 260.83KiB/CPU-s 3.37KiB/CPU-s 0.0 0.028258 False False
syslog_log2metric_splunk_hec_metrics 87.98KiB/CPU-s 0.92 100.00% 9.33MiB/CPU-s 200.51KiB/CPU-s 2.59KiB/CPU-s 0.0 0.020977 9.42MiB/CPU-s 241.97KiB/CPU-s 3.12KiB/CPU-s 0.0 0.025084 False False
syslog_humio_logs 65.57KiB/CPU-s 0.71 100.00% 9.02MiB/CPU-s 290.04KiB/CPU-s 3.74KiB/CPU-s 0.0 0.031399 9.08MiB/CPU-s 342.15KiB/CPU-s 4.42KiB/CPU-s 0.0 0.036779 False False
socket_to_socket_blackhole 22.33KiB/CPU-s 0.17 99.97% 12.98MiB/CPU-s 453.07KiB/CPU-s 5.85KiB/CPU-s 0.0 0.034078 13.0MiB/CPU-s 158.02KiB/CPU-s 2.04KiB/CPU-s 0.0 0.011866 False False
splunk_hec_to_splunk_hec_logs_noack 3.58KiB/CPU-s 0.03 66.47% 13.62MiB/CPU-s 216.35KiB/CPU-s 2.79KiB/CPU-s 0.0 0.015509 13.63MiB/CPU-s 190.13KiB/CPU-s 2.45KiB/CPU-s 0.0 0.013626 False False
http_to_http_noack 4.03KiB/CPU-s 0.03 53.08% 13.61MiB/CPU-s 328.04KiB/CPU-s 4.23KiB/CPU-s 0.0 0.023533 13.62MiB/CPU-s 281.06KiB/CPU-s 3.63KiB/CPU-s 0.0 0.020156 False False
enterprise_http_to_http 1.88KiB/CPU-s 0.01 29.89% 13.62MiB/CPU-s 281.37KiB/CPU-s 3.63KiB/CPU-s 0.0 0.020178 13.62MiB/CPU-s 254.41KiB/CPU-s 3.28KiB/CPU-s 0.0 0.018242 False False
splunk_hec_to_splunk_hec_logs_acks 1.07KiB/CPU-s 0.01 13.30% 13.61MiB/CPU-s 342.69KiB/CPU-s 4.42KiB/CPU-s 0.0 0.024578 13.62MiB/CPU-s 354.69KiB/CPU-s 4.58KiB/CPU-s 0.0 0.025437 False False
file_to_blackhole -1.88KiB/CPU-s -0.0 7.13% 54.49MiB/CPU-s 1.06MiB/CPU-s 13.94KiB/CPU-s 0.0 0.019361 54.49MiB/CPU-s 1.2MiB/CPU-s 15.81KiB/CPU-s 0.0 0.021986 False False
fluent_elasticsearch -3.72KiB/CPU-s -0.01 67.01% 45.41MiB/CPU-s 30.34KiB/CPU-s 396.4B/CPU-s 0.0 0.000652 45.41MiB/CPU-s 297.57KiB/CPU-s 3.8KiB/CPU-s 0.0 0.006399 False False
splunk_hec_indexer_ack_blackhole -2.12KiB/CPU-s -0.02 36.18% 13.62MiB/CPU-s 240.96KiB/CPU-s 3.11KiB/CPU-s 0.0 0.017277 13.62MiB/CPU-s 252.36KiB/CPU-s 3.26KiB/CPU-s 0.0 0.018096 False False
datadog_agent_remap_datadog_logs -74.61KiB/CPU-s -0.21 99.86% 33.93MiB/CPU-s 1.29MiB/CPU-s 17.06KiB/CPU-s 0.0 0.038071 33.86MiB/CPU-s 1.21MiB/CPU-s 15.93KiB/CPU-s 0.0 0.035621 False False
otlp_http_to_blackhole -5.21KiB/CPU-s -0.34 97.64% 1.5MiB/CPU-s 123.77KiB/CPU-s 1.6KiB/CPU-s 0.0 0.08036 1.5MiB/CPU-s 128.34KiB/CPU-s 1.66KiB/CPU-s 0.0 0.083616 False False
syslog_log2metric_humio_metrics -26.36KiB/CPU-s -0.43 100.00% 6.02MiB/CPU-s 313.28KiB/CPU-s 4.04KiB/CPU-s 0.0 0.050847 5.99MiB/CPU-s 287.96KiB/CPU-s 3.72KiB/CPU-s 0.0 0.046938 False False
http_to_http_acks -43.2KiB/CPU-s -0.8 60.06% 5.26MiB/CPU-s 2.77MiB/CPU-s 36.6KiB/CPU-s 0.0 0.526709 5.22MiB/CPU-s 2.72MiB/CPU-s 35.89KiB/CPU-s 0.0 0.520725 True False
datadog_agent_remap_blackhole_acks -283.94KiB/CPU-s -0.88 100.00% 31.66MiB/CPU-s 650.52KiB/CPU-s 8.4KiB/CPU-s 0.0 0.020067 31.38MiB/CPU-s 889.47KiB/CPU-s 11.48KiB/CPU-s 0.0 0.02768 False False
otlp_grpc_to_blackhole -10.63KiB/CPU-s -0.99 100.00% 1.04MiB/CPU-s 34.09KiB/CPU-s 450.61B/CPU-s 0.0 0.031874 1.03MiB/CPU-s 46.8KiB/CPU-s 618.44B/CPU-s 0.0 0.044199 False False
syslog_loki -113.85KiB/CPU-s -1.27 100.00% 8.73MiB/CPU-s 328.8KiB/CPU-s 4.24KiB/CPU-s 0.0 0.036775 8.62MiB/CPU-s 437.43KiB/CPU-s 5.65KiB/CPU-s 0.0 0.049556 False False
syslog_regex_logs2metric_ddmetrics -47.4KiB/CPU-s -1.27 100.00% 3.64MiB/CPU-s 386.21KiB/CPU-s 4.98KiB/CPU-s 0.0 0.103572 3.59MiB/CPU-s 473.09KiB/CPU-s 6.11KiB/CPU-s 0.0 0.128504 True False
datadog_agent_remap_blackhole -673.05KiB/CPU-s -2.12 100.00% 31.05MiB/CPU-s 1.05MiB/CPU-s 13.84KiB/CPU-s 0.0 0.033722 30.39MiB/CPU-s 1.97MiB/CPU-s 26.08KiB/CPU-s 0.0 0.064937 False False
http_to_http_json -299.4KiB/CPU-s -2.15 100.00% 13.62MiB/CPU-s 209.48KiB/CPU-s 2.7KiB/CPU-s 0.0 0.015016 13.33MiB/CPU-s 472.41KiB/CPU-s 6.09KiB/CPU-s 0.0 0.034607 False False
splunk_hec_route_s3 -280.87KiB/CPU-s -2.38 100.00% 11.51MiB/CPU-s 614.4KiB/CPU-s 7.93KiB/CPU-s 0.0 0.052141 11.23MiB/CPU-s 719.67KiB/CPU-s 9.29KiB/CPU-s 0.0 0.062566 False False
http_text_to_http_json -727.35KiB/CPU-s -2.9 100.00% 24.51MiB/CPU-s 781.39KiB/CPU-s 10.09KiB/CPU-s 0.0 0.031131 23.8MiB/CPU-s 653.33KiB/CPU-s 8.43KiB/CPU-s 0.0 0.026806 False False

@zamazan4ik zamazan4ik changed the title Feature/mqtt sink from astro feat(mqtt): add MQTT sink Jan 23, 2023
@jszwedko jszwedko assigned StephenWakely and unassigned neuronull Jan 23, 2023
@neuronull
Copy link
Contributor

Thanks for reviving this @zamazan4ik !

Stephen and myself will be reviewing this shortly.

@zamazan4ik
Copy link
Contributor Author

@jszwedko well, I still cannot resolve the issue, of why the integration tests cannot receive the events from the broker. When I test the sink locally with mqttx, it works fine and I see my events. But integration tests cannot receive events from the broker.

let mut failures = 0;
let mut message_count = 0;
while failures < 5 && message_count < input.len() {
if let Ok(try_msg) = tokio::time::timeout(Duration::from_secs(1), eventloop.poll()).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps, I can make the test pass by increasing the timeout.

Suggested change
if let Ok(try_msg) = tokio::time::timeout(Duration::from_secs(1), eventloop.poll()).await {
if let Ok(try_msg) = tokio::time::timeout(Duration::from_secs(10), eventloop.poll()).await {

I'm not sure if this is a cause for concern. Does it imply that this sink has really low throughput?

@neuronull
Copy link
Contributor

I wonder if the integration tests needs some modifications. The rumqttc crate example doesn't specify a tokio timeout while iteratiing over the event loop , and also we might need to unpack the event received to truly verify the receipt of the message etc.

@StephenWakely StephenWakely mentioned this pull request Mar 16, 2023
@neuronull neuronull added the meta: awaiting author Pull requests that are awaiting their author. label Mar 28, 2023
@gaby
Copy link

gaby commented Sep 5, 2023

@neuronull @zamazan4ik Any updates on this? There hasn't been updates in months.

@StephenWakely
Copy link
Contributor

@neuronull @zamazan4ik Any updates on this? There hasn't been updates in months.

It is frustratingly close, but (apart from the merge conflicts now) we just have that failing integration test that needs digging into. We would very happily welcome any contributions if someone wanted to take this on.

@zamazan4ik
Copy link
Contributor Author

@gaby no updates from my side. I still couldn't resolve the integration test issues on my local dev machine and I don't work on this PR anymore. If you want to continue work on it - feel free to do it.

@mladedav mladedav mentioned this pull request Nov 29, 2023
@muety
Copy link

muety commented Dec 13, 2023

I'd love to have MQTT as a sink! Unfortunately, I have no experience with Rust whatsoever, otherwise I could have given it a shot.

@StephenWakely
Copy link
Contributor

I'd love to have MQTT as a sink! Unfortunately, I have no experience with Rust whatsoever, otherwise I could have given it a shot.

What better way to learn Rust than to solve a complex concurrency timing issue on a large codebase?

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
@zamazan4ik zamazan4ik requested review from a team as code owners January 11, 2024 13:35
@StephenWakely
Copy link
Contributor

Ok, it looks like the issue was caused because when you create a client in rumqttc you receive two objects:

  1. The client through which you publish the messages.
  2. An event loop. This event loop needs to be continuously polled to keep the messages being sent. It also does things like ensuring the connection is open and pings the server.

The original code handled both publishing a message and polling the event loop in a single loop. Essentially:

loop {
  tokio::select! {
    connection.poll() => {}
    input.next() => { client.publish(..).await }
  }
}

The docs for poll do state:

NOTE Don’t block this while iterating

Due to the way select works, we will be blocking poll while we are handling the publish. (Adding biased doesn't seem to help.) This explains why it can really slow down when sending messages. The problem is fixed by moving the connection::poll into a separate tokio:

   tokio::spawn(async move {
            loop {
                let _ = connection.poll().await;
            }
        });

The downside to this is that now the actual sending of the messages to mqtt is separated from the code that publishes the message. We are unable to determine exactly when or even if that message is successfully sent.

Because of this issue we don't get an id when we publish the message that we could use to tie against the results from the connection poll. It might be possible to just assume that the results from poll will be in the exact order that the messages were published. The original code did make this assumption. I will spend a bit of time looking into this to see how possible this is.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
@StephenWakely
Copy link
Contributor

So that wasn't the full fix. It turned out the tests were testing everything that came from rumqttc, which included pings and other messages. Updating the tests to test for the actual messages pulled from the mqtt broker and we are back to a failing integration test.

I'm fairly sure the problem is to do with the integration test and not the main sink code. I can create a new executable with this code and it picks up the integration test messages just fine and without delay.

use rumqttc::{AsyncClient, Event, Incoming, MqttOptions, Publish, QoS};
use std::time::Duration;
use tokio::{task, time};

#[tokio::main]
async fn main() {
    let mut mqttoptions = MqttOptions::new("rumqtt-async", "localhost", 1883);
    mqttoptions.set_keep_alive(Duration::from_secs(5));

    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
    client.subscribe("test", QoS::AtMostOnce).await.unwrap();

    loop {
        let notification = eventloop.poll().await.unwrap();
        if let Event::Incoming(Incoming::Publish(publish)) = notification {
            let message = String::from_utf8_lossy(&publish.payload);
            dbg!(message);
        }
    }
}

So I wonder if maybe there are issues with running two EventLoop instances in the same application. I've tried running the client in a tokio::spawn and tokio::spawn_blocking to no avail.

@mladedav
Copy link
Contributor

mladedav commented Feb 1, 2024

I think that the issue with the integration test is just a race between the subscriber and publisher. When I run the tests as they are, they more often than not first publish all 10 messages and only then gets the client that wants to subscribe a CONNACK message. When it then subscribes, the messages are already handled and since there was no session the messages were not stored for the client.

The error I'm seeing with the current code is assert_eq where one side has ten strings in a slice and the other has an empty slice. And very rarely a few missing where it seems to have subscribed right between the publishes.

I think this is also why it works for you when you make another binary. You first run the subscriber and only then start the test so the subscriber gets the messages fast.

I tried moving code calling run_and_assert_sink_compliance after the tokio::spawn and I threw in between a tokio::time::sleep with 50 ms for good measure. I tried a few times and it seems to be working for me. A better solution would be to have there another channel that would signal that a SUBACK has been received by the client and the publishing should start only after that, but I wouldn't hold it against you if you just wanted to finish this quickly with the sleep.

By the way I've added three comments in the code. Feel free to ignore and resolve any or all of them.


impl MqttSinkConfig {
fn build_connector(&self) -> Result<MqttConnector, MqttError> {
if self.client_id.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably also check thath the client_id is 1-23 characters long and only contains alphanumeric characters [a-zA-Z0-9]{1,23} [MQTT-3.1.3-5].|

Also note, that as per [MQTT-3.1.3-6]:

A Server MAY allow a Client to supply a ClientId that has a length of zero bytes

so empty client ID may be valid although not for every server. I think it's reasonable to disallow it here though for simplicity and compatibility concerns along with having static client ID for reconnections.

It seems that rumqttc panics with empty string, but also for strings starging with whitespace, so if that was the concern, I think the check should cover at least that.

But I would personally expect a lot of people try to use either a dash or an underscore so it might be better UX to reject all invalid client IDs early.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take this back. The server MUST allow alphanumeric client ID shorter than 24 bytes, but may also allow other IDs.

let mut options = MqttOptions::new(&self.client_id, &self.host, self.port);
options.set_keep_alive(Duration::from_secs(self.keep_alive.into()));
options.set_clean_session(self.clean_session);
if let (Some(user), Some(password)) = (&self.user, &self.password) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ignorable, but I believe mqtt allows for sending only password. rumqttc does not seem to allow that though.

In any case if a vector user provides either a username or password and it is ignored because the other part was not provided, I think it should be at least logged as a warning if not outright considered a bad configuration. Even more so since sending only a password may be what they were trying to do.

AtMostOnce,

/// ExactlyOnce.
#[derivative(Default)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TILI, but I think AtLeastOnce is a better default because that is the guarantee that Vector itself provides. Using exactly once will add two more MQTT messages for each sent event while not really providing the guarantee since messages can still be duplicated.

}

fn default_client_id() -> String {
"vector".to_string()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be dangerous since two different clients cannot use the same client ID. So if you have two vector instances that should send mqtt messages to the same sink, this could lead to the clients fighting over the same ID disconnecting each other in an endless loop.

I think there must have been the same issue with other sources or sinks, is there some kind of vector function to create a unique client ID? Ideally, it should be stable between restarts but that's more important for a source than a sink.

Otherwise, I think we should use at least something like VectorSink<short hash> and document that multiple mqtt sinks (and sources) connecting to the same broker must not use the same client ID.

@mladedav
Copy link
Contributor

mladedav commented Feb 6, 2024

@StephenWakely @zamazan4ik Would it be fine by you if I took over and reordered the stuff inside the integration test so that this can be merged?

I have pretty much working mqtt source which just needs a bit of cleanup but I'd like to piggy back on some of the stuff introduced here such as the integration tests infra.

@StephenWakely
Copy link
Contributor

@StephenWakely @zamazan4ik Would it be fine by you if I took over and reordered the stuff inside the integration test so that this can be merged?

I have pretty much working mqtt source which just needs a bit of cleanup but I'd like to piggy back on some of the stuff introduced here such as the integration tests infra.

Yes please! That would be amazing.

@StephenWakely
Copy link
Contributor

Closing since this is continued in #19813.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-condition: integration tests enable Run integration tests on this PR domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks meta: awaiting author Pull requests that are awaiting their author. sink: new A request for a new sink
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet