Skip to content

Commit

Permalink
filter_multiline: Add Fluentd Log Driver Partial Message Support (flu…
Browse files Browse the repository at this point in the history
…ent#5285)

* filter_multiline: implement Docker partial_message support

Signed-off-by: Wesley Pettit <wppttt@amazon.com>
Signed-off-by: Manal Geries <mgeriesa@gmail.com>
  • Loading branch information
PettitWesley authored and mgeriesa committed Oct 25, 2022
1 parent 5b73e8c commit 7f997e0
Show file tree
Hide file tree
Showing 6 changed files with 891 additions and 42 deletions.
3 changes: 2 additions & 1 deletion plugins/filter_multiline/CMakeLists.txt
@@ -1,4 +1,5 @@
set(src
ml.c)
ml.c
ml_concat.c)

FLB_PLUGIN(filter_multiline "${src}" "")
300 changes: 263 additions & 37 deletions plugins/filter_multiline/ml.c
Expand Up @@ -25,8 +25,10 @@
#include <fluent-bit/flb_storage.h>
#include <fluent-bit/multiline/flb_ml.h>
#include <fluent-bit/multiline/flb_ml_parser.h>
#include <fluent-bit/flb_scheduler.h>

#include "ml.h"
#include "ml_concat.h"

static struct ml_stream *get_by_id(struct ml_ctx *ctx, uint64_t stream_id);

Expand Down Expand Up @@ -196,6 +198,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
ctx->ins = ins;
ctx->debug_flush = FLB_FALSE;
ctx->config = config;
ctx->timer_created = FLB_FALSE;

/*
* Config map is not yet set at this point in the code
Expand All @@ -206,6 +209,26 @@ static int cb_ml_init(struct flb_filter_instance *ins,
if (tmp) {
ctx->use_buffer = flb_utils_bool(tmp);
}
ctx->partial_mode = FLB_FALSE;
tmp = (char *) flb_filter_get_property("mode", ins);
if (tmp != NULL) {
if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARTIAL_MESSAGE) == 0) {
ctx->partial_mode = FLB_TRUE;
} else if (strcasecmp(tmp, FLB_MULTILINE_MODE_PARSER) == 0) {
ctx->partial_mode = FLB_FALSE;
} else {
flb_plg_error(ins, "'Mode' must be '%s' or '%s'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE,
FLB_MULTILINE_MODE_PARSER);
return -1;
}
}

if (ctx->partial_mode == FLB_TRUE && ctx->use_buffer == FLB_FALSE) {
flb_plg_error(ins, "'%s' 'Mode' requires 'Buffer' to be 'On'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
}

if (ctx->use_buffer == FLB_FALSE) {
/* Init buffers */
msgpack_sbuffer_init(&ctx->mp_sbuf);
Expand Down Expand Up @@ -251,10 +274,25 @@ static int cb_ml_init(struct flb_filter_instance *ins,
flb_free(ctx);
return -1;
}

/* Set plugin context */
flb_filter_set_context(ins, ctx);

if (ctx->key_content == NULL && ctx->partial_mode == FLB_TRUE) {
flb_plg_error(ins, "'Mode' '%s' requires 'multiline.key_content'",
FLB_MULTILINE_MODE_PARTIAL_MESSAGE);
flb_free(ctx);
return -1;
}

if (ctx->partial_mode == FLB_FALSE && mk_list_size(ctx->multiline_parsers) == 0) {
flb_plg_error(ins, "The default 'Mode' '%s' requires at least one 'multiline.parser'",
FLB_MULTILINE_MODE_PARSER);
flb_free(ctx);
return -1;
}


if (ctx->use_buffer == FLB_TRUE) {
/*
* Emitter Storage Type: the emitter input plugin to be created by default
Expand Down Expand Up @@ -292,43 +330,46 @@ static int cb_ml_init(struct flb_filter_instance *ins,
#endif
}

/* Create multiline context */
ctx->m = flb_ml_create(config, ctx->ins->name);
if (!ctx->m) {
/*
* we don't free the context since upon init failure, the exit
* callback will be triggered with our context set above.
*/
return -1;
}

/* Load the parsers/config */
ret = multiline_load_parsers(ctx);
if (ret == -1) {
return -1;
}

mk_list_init(&ctx->ml_streams);
mk_list_init(&ctx->split_message_packers);

if (ctx->use_buffer == FLB_TRUE) {
if (ctx->partial_mode == FLB_FALSE) {
/* Create multiline context */
ctx->m = flb_ml_create(config, ctx->ins->name);
if (!ctx->m) {
/*
* we don't free the context since upon init failure, the exit
* callback will be triggered with our context set above.
*/
return -1;
}

ctx->m->flush_ms = ctx->flush_ms;
ret = flb_ml_auto_flush_init(ctx->m);
/* Load the parsers/config */
ret = multiline_load_parsers(ctx);
if (ret == -1) {
return -1;
}
} else {
/* Create a stream for this file */
len = strlen(ins->name);
ret = flb_ml_stream_create(ctx->m,
ins->name, len,
flush_callback, ctx,
&stream_id);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not create multiline stream");
return -1;

if (ctx->use_buffer == FLB_TRUE) {

ctx->m->flush_ms = ctx->flush_ms;
ret = flb_ml_auto_flush_init(ctx->m);
if (ret == -1) {
return -1;
}
} else {
/* Create a stream for this file */
len = strlen(ins->name);
ret = flb_ml_stream_create(ctx->m,
ins->name, len,
flush_callback, ctx,
&stream_id);
if (ret != 0) {
flb_plg_error(ctx->ins, "could not create multiline stream");
return -1;
}
ctx->stream_id = stream_id;
}
ctx->stream_id = stream_id;
}

return 0;
Expand Down Expand Up @@ -450,6 +491,176 @@ static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
return stream;
}

static void partial_timer_cb(struct flb_config *config, void *data)
{
struct ml_ctx *ctx = data;
(void) config;
struct mk_list *tmp;
struct mk_list *head;
struct split_message_packer *packer;
unsigned long long now;
unsigned long long diff;
int ret;

now = ml_current_timestamp();

mk_list_foreach_safe(head, tmp, &ctx->split_message_packers) {
packer = mk_list_entry(head, struct split_message_packer, _head);

diff = now - packer->last_write_time;
if (diff <= ctx->flush_ms) {
continue;
}

mk_list_del(&packer->_head);
ml_split_message_packer_complete(packer);
/* re-emit record with original tag */
flb_plg_trace(ctx->ins, "emitting from %s to %s", packer->input_name, packer->tag);
ret = in_emitter_add_record(packer->tag, flb_sds_len(packer->tag),
packer->mp_sbuf.data, packer->mp_sbuf.size,
ctx->ins_emitter);
if (ret < 0) {
/* this shouldn't happen in normal execution */
flb_plg_warn(ctx->ins, "Couldn't send concatenated record of size %zu bytes to in_emitter %s",
packer->mp_sbuf.size, ctx->ins_emitter->name);
}
ml_split_message_packer_destroy(packer);
}

}

static int ml_filter_partial(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
struct flb_filter_instance *f_ins,
struct flb_input_instance *i_ins,
void *filter_context,
struct flb_config *config)
{
int ret;
int ok = MSGPACK_UNPACK_SUCCESS;
size_t off = 0;
(void) f_ins;
(void) config;
msgpack_unpacked result;
msgpack_object *obj;
struct ml_ctx *ctx = filter_context;
struct flb_time tm;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
int partial_records = 0;
int total_records = 0;
int return_records = 0;
int partial = FLB_FALSE;
int is_last_partial = FLB_FALSE;
struct split_message_packer *packer;
char *partial_id_str = NULL;
size_t partial_id_size = 0;
struct flb_sched *sched;

/*
* create a timer that will run periodically and check if pending buffers
* have expired
* this is created once on the first flush
*/
if (ctx->timer_created == FLB_FALSE) {
flb_plg_debug(ctx->ins,
"Creating flush timer with frequency %dms",
ctx->flush_ms);

sched = flb_sched_ctx_get();

ret = flb_sched_timer_cb_create(sched, FLB_SCHED_TIMER_CB_PERM,
ctx->flush_ms / 2, partial_timer_cb,
ctx, NULL);
if (ret < 0) {
flb_plg_error(ctx->ins, "Failed to create flush timer");
} else {
ctx->timer_created = FLB_TRUE;
}
}

/*
* Create temporary msgpack buffer
* for non-partial messages which are passed on as-is
*/
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

msgpack_unpacked_init(&result);
while (msgpack_unpack_next(&result, data, bytes, &off) == ok) {
total_records++;
flb_time_pop_from_msgpack(&tm, &result, &obj);

partial = ml_is_partial(obj);
if (partial == FLB_TRUE) {
partial_records++;
ret = ml_get_partial_id(obj, &partial_id_str, &partial_id_size);
if (ret == -1) {
flb_plg_warn(ctx->ins, "Could not find partial_id but partial_message key is FLB_TRUE for record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
packer = ml_get_packer(&ctx->split_message_packers, tag,
i_ins->name, partial_id_str, partial_id_size);
if (packer == NULL) {
flb_plg_trace(ctx->ins, "Found new partial record with tag %s", tag);
packer = ml_create_packer(tag, i_ins->name, partial_id_str, partial_id_size,
obj, ctx->key_content, &tm);
if (packer == NULL) {
flb_plg_warn(ctx->ins, "Could not create packer for partial record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
mk_list_add(&packer->_head, &ctx->split_message_packers);
}
ret = ml_split_message_packer_write(packer, obj, ctx->key_content);
if (ret < 0) {
flb_plg_warn(ctx->ins, "Could not append content for partial record with tag %s", tag);
/* handle this record as non-partial */
partial_records--;
goto pack_non_partial;
}
is_last_partial = ml_is_partial_last(obj);
if (is_last_partial == FLB_TRUE) {
/* emit the record in this filter invocation */
return_records++;
ml_split_message_packer_complete(packer);
ml_append_complete_record(packer->mp_sbuf.data, packer->mp_sbuf.size, &tmp_pck);
mk_list_del(&packer->_head);
ml_split_message_packer_destroy(packer);
}
} else {

pack_non_partial:
return_records++;
/* record passed from filter as-is */
msgpack_pack_array(&tmp_pck, 2);
flb_time_append_to_msgpack(&tm, &tmp_pck, 0);
msgpack_pack_object(&tmp_pck, *obj);
}

}

msgpack_unpacked_destroy(&result);

if (partial_records == 0) {
/* if no records were partial, we didn't modify the chunk */
msgpack_sbuffer_destroy(&tmp_sbuf);
return FLB_FILTER_NOTOUCH;
} else if (return_records > 0) {
/* some new records can be returned now, return a new buffer */
*out_buf = tmp_sbuf.data;
*out_bytes = tmp_sbuf.size;
} else {
/* no records to return right now, free buffer */
msgpack_sbuffer_destroy(&tmp_sbuf);
}
return FLB_FILTER_MODIFIED;
}

static int cb_ml_filter(const void *data, size_t bytes,
const char *tag, int tag_len,
void **out_buf, size_t *out_bytes,
Expand All @@ -461,8 +672,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
int ret;
int ok = MSGPACK_UNPACK_SUCCESS;
size_t off = 0;
(void) out_buf;
(void) out_bytes;
(void) f_ins;
(void) config;
msgpack_unpacked result;
Expand All @@ -473,6 +682,20 @@ static int cb_ml_filter(const void *data, size_t bytes,
struct flb_time tm;
struct ml_stream *stream;

if (i_ins == ctx->ins_emitter) {
flb_plg_trace(ctx->ins, "not processing records from the emitter");
return FLB_FILTER_NOTOUCH;
}

/* 'partial_message' mode */
if (ctx->partial_mode == FLB_TRUE) {
return ml_filter_partial(data, bytes, tag, tag_len,
out_buf, out_bytes,
f_ins, i_ins,
filter_context, config);
}

/* 'parser' mode */
if (ctx->use_buffer == FLB_FALSE) {
/* reset mspgack size content */
ctx->mp_sbuf.size = 0;
Expand Down Expand Up @@ -518,10 +741,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
return FLB_FILTER_NOTOUCH;

} else { /* buffered mode */
if (i_ins == ctx->ins_emitter) {
flb_plg_trace(ctx->ins, "not processing record from the emitter");
return FLB_FILTER_NOTOUCH;
}

stream = get_or_create_stream(ctx, i_ins, tag, tag_len);

Expand Down Expand Up @@ -595,6 +814,13 @@ static struct flb_config_map config_map[] = {
"With buffer off, this filter will not work with most inputs, except tail."
},

{
FLB_CONFIG_MAP_STR, "mode", "parser",
0, FLB_TRUE, offsetof(struct ml_ctx, mode),
"Mode can be 'parser' for regex concat, or 'partial_message' to "
"concat split docker logs."
},

{
FLB_CONFIG_MAP_INT, "flush_ms", "2000",
0, FLB_TRUE, offsetof(struct ml_ctx, flush_ms),
Expand Down

0 comments on commit 7f997e0

Please sign in to comment.