From d55f53e4763e0f11f29ac5c9c9159ae797a209a9 Mon Sep 17 00:00:00 2001 From: Ryan Ohnemus Date: Sun, 14 Jan 2024 09:45:57 -0600 Subject: [PATCH] in_kubernetes_events: consolidate record timestamp logic (#8323) Signed-off-by: ryanohnemus Signed-off-by: ahspw --- .../in_kubernetes_events/kubernetes_events.c | 80 +++++++------------ .../in_kubernetes_events/kubernetes_events.h | 3 +- .../kubernetes_events_conf.c | 17 ---- .../kubernetes_events_conf.h | 1 - 4 files changed, 28 insertions(+), 73 deletions(-) diff --git a/plugins/in_kubernetes_events/kubernetes_events.c b/plugins/in_kubernetes_events/kubernetes_events.c index 4f71095be6b..6231becb0ad 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.c +++ b/plugins/in_kubernetes_events/kubernetes_events.c @@ -167,19 +167,6 @@ static int refresh_token_if_needed(struct k8s_events *ctx) return 0; } -static int timestamp_lookup(struct k8s_events *ctx, char *ts, struct flb_time *time) -{ - struct flb_tm tm = { 0 }; - - if (flb_strptime(ts, "%Y-%m-%dT%H:%M:%SZ", &tm) == NULL) { - return -1; - } - - time->tm.tv_sec = flb_parser_tm2time(&tm); - time->tm.tv_nsec = 0; - - return 0; -} static msgpack_object *record_get_field_ptr(msgpack_object *obj, const char *fieldname) { @@ -221,7 +208,7 @@ static int record_get_field_sds(msgpack_object *obj, const char *fieldname, flb_ return 0; } -static int record_get_field_time(msgpack_object *obj, const char *fieldname, time_t *val) +static int record_get_field_time(msgpack_object *obj, const char *fieldname, struct flb_time *val) { msgpack_object *v; struct flb_tm tm = { 0 }; @@ -238,7 +225,9 @@ static int record_get_field_time(msgpack_object *obj, const char *fieldname, tim return -2; } - *val = mktime(&tm.tm); + val->tm.tv_sec = flb_parser_tm2time(&tm); + val->tm.tv_nsec = 0; + return 0; } @@ -271,7 +260,7 @@ static int record_get_field_uint64(msgpack_object *obj, const char *fieldname, u return -1; } -static int item_get_timestamp(msgpack_object *obj, time_t *event_time) +static int item_get_timestamp(msgpack_object *obj, struct flb_time *event_time) { int ret; msgpack_object *metadata; @@ -301,25 +290,19 @@ static int item_get_timestamp(msgpack_object *obj, time_t *event_time) return FLB_FALSE; } -static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj) +static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj, + time_t* event_time) { int ret; - time_t event_time; time_t now; msgpack_object *metadata; flb_sds_t uid; uint64_t resource_version; - ret = item_get_timestamp(obj, &event_time); - if (ret == -FLB_FALSE) { - flb_plg_error(ctx->ins, "Cannot get timestamp for item in response"); - return FLB_FALSE; - } - now = (time_t)(cfl_time_now() / 1000000000); - if (event_time < (now - ctx->retention_time)) { + if (*event_time < (now - ctx->retention_time)) { flb_plg_debug(ctx->ins, "Item is older than retention_time: %ld < %ld", - event_time, (now - ctx->retention_time)); + *event_time, (now - ctx->retention_time)); return FLB_TRUE; } @@ -373,7 +356,7 @@ static bool check_event_is_filtered(struct k8s_events *ctx, msgpack_object *obj) // check if this is an old event. if (ctx->last_resource_version && resource_version <= ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "skipping old object: %lu (< %lu)", resource_version, + flb_plg_debug(ctx->ins, "skipping old object: %llu (< %llu)", resource_version, ctx->last_resource_version); flb_sds_destroy(uid); return FLB_TRUE; @@ -393,7 +376,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, size_t buf_size; size_t off = 0; struct flb_time ts; - struct flb_ra_value *rval; uint64_t resource_version; msgpack_unpacked result; msgpack_object root; @@ -498,7 +480,14 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, goto msg_error; } - if (check_event_is_filtered(ctx, item) == FLB_TRUE) { + /* get event timestamp */ + ret = item_get_timestamp(item, &ts); + if (ret == FLB_FALSE) { + flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); + goto msg_error; + } + + if (check_event_is_filtered(ctx, item, &ts) == FLB_TRUE) { continue; } @@ -508,21 +497,6 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, } #endif - /* get event timestamp */ - rval = flb_ra_get_value_object(ctx->ra_timestamp, *item); - if (!rval || rval->type != FLB_RA_STRING) { - flb_plg_error(ctx->ins, "cannot retrieve event timestamp"); - goto msg_error; - } - - /* convert timestamp */ - ret = timestamp_lookup(ctx, rval->val.string, &ts); - if (ret == -1) { - flb_plg_error(ctx->ins, "cannot lookup event timestamp"); - flb_ra_key_value_destroy(rval); - goto msg_error; - } - /* encode content as a log event */ flb_log_event_encoder_begin_record(ctx->encoder); flb_log_event_encoder_set_timestamp(ctx->encoder, &ts); @@ -531,9 +505,8 @@ static int process_events(struct k8s_events *ctx, char *in_data, size_t in_size, if (ret == FLB_EVENT_ENCODER_SUCCESS) { ret = flb_log_event_encoder_commit_record(ctx->encoder); } else { - flb_plg_warn(ctx->ins, "unable to encode: %lu", resource_version); + flb_plg_warn(ctx->ins, "unable to encode: %llu", resource_version); } - flb_ra_key_value_destroy(rval); } if (ctx->encoder->output_length > 0) { @@ -616,7 +589,7 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i { int ret; uint64_t resource_version; - time_t last; + struct flb_time last; msgpack_object *meta; flb_sds_t uid; @@ -654,21 +627,21 @@ static int k8s_events_sql_insert_event(struct k8s_events *ctx, msgpack_object *i /* Bind parameters */ sqlite3_bind_text(ctx->stmt_insert_kubernetes_event, 1, uid, -1, 0); sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 2, resource_version); - sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, (int64_t)last); + sqlite3_bind_int64(ctx->stmt_insert_kubernetes_event, 3, flb_time_to_nanosec(&last)); /* Run the insert */ ret = sqlite3_step(ctx->stmt_insert_kubernetes_event); if (ret != SQLITE_DONE) { sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); sqlite3_reset(ctx->stmt_insert_kubernetes_event); - flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%lu", + flb_plg_error(ctx->ins, "cannot execute insert kubernetes event %s inode=%llu", uid, resource_version); flb_sds_destroy(uid); return -1; } flb_plg_debug(ctx->ins, - "inserted k8s event: uid=%s, resource_version=%lu, last=%ld", + "inserted k8s event: uid=%s, resource_version=%llu, last=%ld", uid, resource_version, last); sqlite3_clear_bindings(ctx->stmt_insert_kubernetes_event); sqlite3_reset(ctx->stmt_insert_kubernetes_event); @@ -745,7 +718,7 @@ static int k8s_events_collect(struct flb_input_instance *ins, } while(continue_token != NULL); if (max_resource_version > ctx->last_resource_version) { - flb_plg_debug(ctx->ins, "set last resourceVersion=%lu", max_resource_version); + flb_plg_debug(ctx->ins, "set last resourceVersion=%llu", max_resource_version); ctx->last_resource_version = max_resource_version; } @@ -889,10 +862,11 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct k8s_events, namespace), "kubernetes namespace to get events from, gets event from all namespaces by default." }, + { - FLB_CONFIG_MAP_STR, "timestamp_key", K8S_EVENTS_RA_TIMESTAMP, + FLB_CONFIG_MAP_STR, "timestamp_key", NULL, 0, FLB_TRUE, offsetof(struct k8s_events, timestamp_key), - "Record accessor for the timestamp from the event. Default is $lastTimestamp." + "Deprecated. To be removed in v3.0" }, #ifdef FLB_HAVE_SQLDB diff --git a/plugins/in_kubernetes_events/kubernetes_events.h b/plugins/in_kubernetes_events/kubernetes_events.h index f22816bc392..a4bcbf19d11 100644 --- a/plugins/in_kubernetes_events/kubernetes_events.h +++ b/plugins/in_kubernetes_events/kubernetes_events.h @@ -74,11 +74,10 @@ struct k8s_events { struct flb_log_event_encoder *encoder; - /* timestamp key */ + /* timestamp key - deprecated, to be removed in v3.0 */ flb_sds_t timestamp_key; /* record accessor */ - struct flb_record_accessor *ra_timestamp; struct flb_record_accessor *ra_resource_version; /* others */ diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.c b/plugins/in_kubernetes_events/kubernetes_events_conf.c index c00d7658b9b..bc14b0ad09f 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.c +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.c @@ -135,7 +135,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) int ret; const char *p; const char *url; - const char *timestampKey; const char *tmp; struct k8s_events *ctx = NULL; pthread_mutexattr_t attr; @@ -165,19 +164,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) return NULL; } - /* Record accessor pattern */ - timestampKey = flb_input_get_property("timestamp_key", ins); - if (!timestampKey ) { - timestampKey = K8S_EVENTS_RA_TIMESTAMP; - } - ctx->ra_timestamp = flb_ra_create(timestampKey, FLB_TRUE); - if (!ctx->ra_timestamp) { - flb_plg_error(ctx->ins, - "could not create record accessor for record timestamp"); - k8s_events_conf_destroy(ctx); - return NULL; - } - ctx->ra_resource_version = flb_ra_create(K8S_EVENTS_RA_RESOURCE_VERSION, FLB_TRUE); if (!ctx->ra_resource_version) { flb_plg_error(ctx->ins, "could not create record accessor for resource version"); @@ -289,9 +275,6 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins) void k8s_events_conf_destroy(struct k8s_events *ctx) { - if (ctx->ra_timestamp) { - flb_ra_destroy(ctx->ra_timestamp); - } if (ctx->ra_resource_version) { flb_ra_destroy(ctx->ra_resource_version); diff --git a/plugins/in_kubernetes_events/kubernetes_events_conf.h b/plugins/in_kubernetes_events/kubernetes_events_conf.h index 6196c3da70d..82400877ca9 100644 --- a/plugins/in_kubernetes_events/kubernetes_events_conf.h +++ b/plugins/in_kubernetes_events/kubernetes_events_conf.h @@ -38,7 +38,6 @@ #define K8S_EVENTS_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token" #define K8S_EVENTS_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" -#define K8S_EVENTS_RA_TIMESTAMP "$lastTimestamp" #define K8S_EVENTS_RA_RESOURCE_VERSION "$metadata['resourceVersion']" struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins);