Skip to content

Commit

Permalink
Read acknowledgements send by Riemann
Browse files Browse the repository at this point in the history
When Riemann receive a message, it sends an acknowledgements to the
sending process when the connexion is over TCP (TCP or TLS mode of
operation).  Not reading this acknowledgement make these messages
fill-in the Recv-Q on the system running syslog-ng.

The riemann_communicate() function of riemann-c-client is a wrapper
around riemann_client_send_message_oneshot() that also reads this
acknowledgement if it exist, or mocks it when using UDP.

This patch change the riemann module to rely on riemann_communicate() to
send events and avoid filling-in the Recv-Q.  Because the function
returns a more granular riemann_message_t, update the trace message to
include these details, and adjust the error detection code accordingly.

Fixes #2521

Signed-off-by: Romain Tartière <romain@blogreen.org>
  • Loading branch information
smortex committed Feb 2, 2019
1 parent d85a3ca commit c4ce16c
Showing 1 changed file with 26 additions and 14 deletions.
40 changes: 26 additions & 14 deletions modules/riemann/riemann-worker.c
Expand Up @@ -26,7 +26,7 @@
#include "riemann-worker.h"
#include "scratch-buffers.h"

#include <riemann/riemann-client.h>
#include <riemann/simple.h>
#include <stdlib.h>

static void
Expand Down Expand Up @@ -315,23 +315,15 @@ riemann_worker_flush(LogThreadedDestWorker *s)
RiemannDestWorker *self = (RiemannDestWorker *) s;
RiemannDestDriver *owner = (RiemannDestDriver *) self->super.owner;
riemann_message_t *message;
int r;
riemann_message_t *r;

if (self->event.n == 0)
return LTR_SUCCESS;

message = riemann_message_new();

riemann_message_set_events_n(message, self->event.n, self->event.list);
r = riemann_client_send_message_oneshot(self->client, message);

msg_trace("riemann: flushing messages to Riemann server",
evt_tag_str("server", owner->server),
evt_tag_int("port", owner->port),
evt_tag_int("batch_size", self->event.n),
evt_tag_int("result", r),
evt_tag_str("driver", owner->super.super.super.id),
log_pipe_location_tag(&owner->super.super.super.super));
r = riemann_communicate(self->client, message);

/*
* riemann_client_send_message_oneshot() will free self->event.list,
Expand All @@ -341,10 +333,30 @@ riemann_worker_flush(LogThreadedDestWorker *s)
self->event.n = 0;
self->event.list = (riemann_event_t **) malloc(sizeof (riemann_event_t *) *
MAX(1, owner->super.batch_lines));
if (r != 0)
return LTR_ERROR;
if (!r)
{
return LTR_ERROR;
}

msg_trace("riemann: flushing messages to Riemann server",
evt_tag_str("server", owner->server),
evt_tag_int("port", owner->port),
evt_tag_int("batch_size", self->event.n),
evt_tag_int("ok", r->ok),
evt_tag_str("error", r->error),
evt_tag_str("driver", owner->super.super.super.id),
log_pipe_location_tag(&owner->super.super.super.super));

if ((r->error) || (r->has_ok && !r->ok))
{
riemann_message_free(r);
return LTR_ERROR;
}
else
return LTR_SUCCESS;
{
riemann_message_free(r);
return LTR_SUCCESS;
}
}

static LogThreadedResult
Expand Down

0 comments on commit c4ce16c

Please sign in to comment.