From 8ae2fe745a600d4e7e1f9b220c32aa9852a8f556 Mon Sep 17 00:00:00 2001 From: knadupur Date: Wed, 12 Apr 2023 16:31:37 +0530 Subject: [PATCH] v2.0.5 --- CHANGELOG.md | 12 + FAQ.md | 227 ++++++++++++++++++ README.md | 43 +++- THIRD_PARTY_LICENSES.txt | 41 ++++ examples/apache.conf | 81 ++++--- examples/kafka.conf | 83 ++++--- examples/multi_worker.conf | 189 +++++++++++++++ examples/syslog.conf | 87 +++---- fluent-plugin-oci-logging-analytics.gemspec | 3 +- lib/fluent/metrics/metricsLabels.rb | 3 +- lib/fluent/metrics/prometheusMetrics.rb | 19 +- .../plugin/out_oci-logging-analytics.rb | 223 +++++++++-------- 12 files changed, 772 insertions(+), 239 deletions(-) create mode 100644 FAQ.md create mode 100644 examples/multi_worker.conf diff --git a/CHANGELOG.md b/CHANGELOG.md index 09fe2bc..65841f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Change Log +## 2.0.5 - 2022-04-12 +### Added +- Prometheus metrics support for multi worker configuration. +- 'FAQ' section to help customers in triaging issues when encountered. +### Changed +- Supporting both 'memory' and 'file' buffer types. The recommended and default buffer type is still 'file'. +- Using Yajl over default JSON library for handling multi byte character logs gracefully. +- Plugin log defaults to STDOUT + - 'oci-logging-analytics.log' file will no longer be created when 'plugin_log_location' in match block section is not provided explicitly. +### Bug fix +- 'tag' field not mandatory in filter block. + ## 2.0.4 - 2022-06-20 ### Changed - Updated prometheus-client dependency to v4.0.0. diff --git a/FAQ.md b/FAQ.md new file mode 100644 index 0000000..83a9a28 --- /dev/null +++ b/FAQ.md @@ -0,0 +1,227 @@ +# Frequently Asked Questions + +- [Why am I getting this error - "Error occurred while initializing LogAnalytics Client" ?](#why-am-i-getting-this-error---error-occurred-while-initializing-loganalytics-client-) +- [Why am I getting this error - "Error occurred while parsing oci_la_log_set" ?](#why-am-i-getting-this-error---error-occurred-while-parsing-oci_la_log_set-) +- [Why am I getting this error - "Error while uploading the payload" or "execution expired" or "status : 0" ?](#why-am-i-getting-this-error---error-while-uploading-the-payload-or--execution-expired-or--status--0-) +- [How to find fluentd/output plugin logs ?](#how-to-find-fluentdoutput-plugin-logs-) +- [Fluentd successfully uploaded data, but still it is not visible in LogExplorer. How to triage ?](#fluentd-successfully-uploaded-data-but-still-it-is-not-visible-in-logexplorer-how-to-triage-) +- [How to extract specific K8s metadata field that I am interested in to a Logging Analytics field ?](#how-to-extract-specific-k8s-metadata-field-that-i-am-interested-in-to-a-logging-analytics-field-) +- [How to make Fluentd process the log data from the beginning of a file when using tail input plugin ?](#how-to-make-fluentd-process-the-log-data-from-the-beginning-of-a-file-when-using-tail-input-plugin-) +- [How to make Fluentd process the last line from the file when using tail input plugin ?](#how-to-make-fluentd-process-the-last-line-from-the-file-when-using-tail-input-plugin-) +- [In multi worker setup, prometheus is not displaying all the worker's metrics. How to fix it ?](#in-multi-worker-setup-prometheus-is-not-displaying-all-the-workers-metrics-how-to-fix-it-) +- [Why am I getting this error - "ConcatFilter::TimeoutError" ?](#why-am-i-getting-this-error---concatfiltertimeouterror-) +- [Fluentd is failing to parse the log data. What can be the reason ?](#fluentd-is-failing-to-parse-the-log-data-what-can-be-the-reason-) + + +## Why am I getting this error - "Error occurred while initializing LogAnalytics Client" ? +- This occurs mostly due to incorrect authorization type/configuration provided. +- This plugin uses either config based auth or Instance principal based auth with default being Instance principal based auth. +- For config based auth ensure valid "config_file_location" and "profile" details are provided in match block as shown below. + ``` + + @type oci-logging-analytics + config_file_location #REPLACE_ME + profile_name DEFAULT + + ``` + +## Why am I getting this error - "Error occurred while parsing oci_la_log_set" ? +- The provided Regex do not match the key coming in and the regex might need a correction. +- This might be expected behaviour with the regex configured where not all keys need to be matched. In such scenarios we fall back to use logSet parameter set using oci_la_log_set. +- You may also apply logSet using alternative approach documented [here](https://docs.oracle.com/en-us/iaas/logging-analytics/doc/manage-log-partitioning.html#LOGAN-GUID-2EC8EEDE-9BBD-4872-8083-A44F77611524) + +## Why am I getting this error - "Error while uploading the payload" or "execution expired" or "status : 0" ? +- Sample logs: + ``` + I, [2023-01-18T10:39:49.483789 #11] INFO -- : Received new chunk, started processing ... + I, [2023-01-18T10:39:49.495771 #11] INFO -- : Generating payload with 31 records for oci_la_log_group_id: ocid1.loganalyticsloggroup.oc1.iad.amaaaaaa.... + E, [2023-01-18T10:39:59.502747 #11] ERROR -- : oci upload exception : Error while uploading the payload. { 'message': 'execution expired', 'status': 0, 'opc-request-id':'C37D1DE643E24D778FC5FA22835FE024', 'response-body': '' } + ``` + +- This occurs due to connectivity to OCI endpoint. Ensure the proxy details are provided are valid if configured, or you have network connectivity to reach the OCI endpoint from where you are running the fluentd. + +## How to find fluentd/output plugin logs ? +- By default (starting from 2.0.5 version), oci logging analytics output plugin logs goes to STDOUT and available as part of the fluentd logs itself, unless it is explicitly configured using the following plugin parameter. + ``` + plugin_log_location "#{ENV['FLUENT_OCI_LOG_LOCATION'] || '/var/log'}" + # Log file named 'oci-logging-analytics.log' will be generated in the above location + ``` +- For td-agent (rpm/deb) based setup, the fluentd logs are located at /var/log/td-agent/td-agent.log + + +## Fluentd successfully uploaded data, but still it is not visible in LogExplorer. How to triage ? +- Check if selected time range in log explorer is in line with the actual log messages timestamp. +- Check after some time - As the processing of the data happens asynchronously, there are cases it may take some time to reflect the data in log explorer. + - The processing of the data may fail in the subsequent validations which happens at OCI. + - Check for any [processing errors](https://docs.oracle.com/en-us/iaas/logging-analytics/doc/troubleshoot-ingestion-pipeline.html). + - If the issue is still persistent, raise an SR by providing the following information. tenency_ocid, region, sample opc-request-id/opc-object-id + - You may get the opc-request-id/opc-object-id from fluentd output plugin log. The sample log for a successful upload looks like below, + + ``` + I, [2023-01-18T10:39:49.483789 #11] INFO -- : Received new chunk, started processing ... + I, [2023-01-18T10:39:49.495771 #11] INFO -- : Generating payload with 30 records for oci_la_log_group_id: ocid1.loganalyticsloggroup.oc1.iad.amaaaaaa.... + I, [2023-01-18T10:39:59.502747 #11] INFO -- : The payload has been successfully uploaded to logAnalytics - + oci_la_log_group_id: ocid1.loganalyticsloggroup.oc1.iad.amaaaaaa...., + ConsumedRecords: #30, + opc-request-id':'C37D1DE643E24D778FC5FA22835FE024', + opc-object-id: 'C37D1DE643E24D778FC5FA22835FE024-D37D1DE643E24D778FC5FA22835FE024'" + ``` +## How to extract specific K8s metadata field that I am interested in to a Logging Analytics field ? +- We can get this kind of scenario when collecting the logs from Kubernetes clusters and using kubernetes_metadata_filter to enrich the data at fluentd. +- By default, fluentd output plugin will fetch following fields "container_name", "namespace_name", "pod_name", "container_image", "host" from kubernetes metadata when available, and maps them to following fields "Container", "Namespace", "Pod", "Container Image Name", "Node". +- In case if a new field is needed to be extracted, or to modify the default mappings, add "kubernetes_metadata_keys_mapping" in match block like shown below. + - When you are adding a new field mapping, ensure the corresponding Loggingg Analytics field is already defined. + + ``` + + @type oci-logging-analytics + nameSpace namespace #REPLACE_ME + config_file_location ~/.oci/config #REPLACE_ME + profile_name DEFAULT + kubernetes_metadata_keys_mapping {"container_name":"Container","namespace_name":"Namespace","pod_name":"Pod","container_image":"Container Image Name","host":"Node"} + + @type file + path /var/log/fluent_oci_outplugin/buffer/ #REPLACE_ME + disable_chunk_backup true + + + ``` + +## How to make Fluentd process the log data from the beginning of a file when using tail input plugin ? +- The default behaviour of the tail plugin is to read from the latest(tail). This behaviour can be altered by modifying the "read_from_head" parameter. +- The below is an example tail plugin configuration to read from beginning of a file named foo.log + + ``` + + @type tail + + @type none + + path foo.log + pos_file foo.pos + tag oci.foo + read_from_head true + + ``` + +## How to make Fluentd process the last line from the file when using tail input plugin ? +- In case of multi-line events, for last line, log consumption might be delayed until the next log message is written to the log file. Fluentd will only parse the last line when a line break is appended at the end of the line. +- To fix this, add/increase multiline_flush_interval property in source block. + + ``` + + @type tail + multiline_flush_interval 5s + + @type multiline + format_firstline /\d{4}-[01]\d-[0-3]\d\s[0-2]\d((:[0-5]\d)?){2}\s+(\w+)\s+\[([\w-]+)?\]\s([\w._$]+)\s+([-\w]+)\s+(.*)/ + format1 /^(?.*)/ + + path foo.log + pos_file foo.pos + tag oci.foo + read_from_head true + + ``` + +## In multi worker setup, prometheus is not displaying all the worker's metrics. How to fix it ? +- In case of multi worker setup, each worker needs its own port binding. To ensure prometheus engine is scraping metrics from multiple ports, provide "aggregated_metrics_path /aggregated_metrics" in prometheus source config as shown below. +- Multi worker config example + + ``` + + @type prometheus + bind 0.0.0.0 + port 24231 + aggregated_metrics_path /aggregated_metrics + + ``` + +- Single worker config example + + ``` + + @type prometheus + bind 0.0.0.0 + port 24231 + metrics_path /metrics + + ``` + +## Why am I getting this error - "ConcatFilter::TimeoutError" ? +- This error occurs when using Concat plugin to handle multiline log messages. +- When the incoming log flow is very slow, then concat plugin throws this error to avoid waiting indefinitely for the next multiline start expression match. +- By increasing the flush_interval for this concat filter to appropriate value, this issue can be avoided. +- We recommend usage of "timeout_label" to redirect the corresponding log messages and handle them appropriately to avoid the data loss. +- When using "timeout_label", you may ignore this error. + + ``` + # Concat filter to handle multi-line log records. + + @type concat + key message + flush_interval 15 + timeout_label @NORMAL + multiline_start_regexp /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z/ + separator "" + + + + ``` + + +## Fluentd is failing to parse the log data. What can be the reason ? +- In the source-block check the regex/format_firstline expression and check if it matches with your log data. +- Sample logs: + ``` + 2021-02-06 01:44:03 +0000 [warn]: #0 dump an error event: + error_class=Fluent::Plugin::Parser::ParserError error="pattern not match with data + ``` +- Multiline + + ``` + + @type tail + multiline_flush_interval 5s + + @type multiline + format_firstline /\d{4}-[01]\d-[0-3]\d\s[0-2]\d((:[0-5]\d)?){2}\s+(\w+)\s+\[([\w-]+)?\]\s([\w._$]+)\s+([-\w]+)\s+(.*)/ + format1 /^(?.*)/ + + path foo.log + pos_file foo.pos + tag oci.foo + read_from_head true + + ``` + +- Regexp + + ``` + + @type tail + multiline_flush_interval 5s + # regexp + + @type regexp + expression ^(?[^ ]*) (?[^ ]*) (?\d*)$ + + path foo.log + pos_file foo.pos + tag oci.foo + read_from_head true + + ``` \ No newline at end of file diff --git a/README.md b/README.md index 17fd947..223048e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Refer [Prerequisites](https://docs.oracle.com/en/learn/oci_logging_analytics_flu rubyzip oci prometheus-client + yajl-ruby >= 2.0.0 @@ -29,6 +30,7 @@ Refer [Prerequisites](https://docs.oracle.com/en/learn/oci_logging_analytics_flu ~> 2.3.2 ~> 2.16 ~> 2.1.0 + ~>1.4.3 @@ -59,6 +61,21 @@ Or install it manually as: ### Buffer Configuration - [Buffer configuration parameters](https://docs.oracle.com/en/learn/oci_logging_analytics_fluentd/#buffer-configuration-parameters) + - Note* - Buffer type 'file' is recommended but no longer mandatory. Customer can configure other available options. + +#### Advantages of 'file' based Buffer plugin + - In case of a fast input plugin and slow output plugin, buffer will keep on increase and file based buffer with default 50GB (can configure for higher value) can handle the output plugin delay. + We may not have huge memory configured for memory based plugin and will result in data loss. + + - In case of back-end service not available (5XX exceptions), output plugin will keep retrying with the existing chunk and meanwhile, new chunks will be keep on getting scheduled. + Each chunk being 2MB size, and with chunk interval 30 sec, in case of a 30 mins outage (can be more in unforeseen cases), we need a minimum of 120MB memory allocated for memory buffer. + + - In any container based logging analytics, as the logs are not saved in the containers and they are completely lost in case not consumed, file based memory provides a persistent buffer implementation. + + - For container based deployments, while creating the fluentd config file, we need to consider all these edge cases and proper sizing to come up with the memory size. + + - As the data loss is very critical and not all customers are aware of these cases, we prevented memory buffer. Having said that, we can go ahead and remove that limitation and let the informed customers like you can do the proper sizing and decide which option is helpful for them. + ### Input Plugin Configuration @@ -89,35 +106,43 @@ Refer [Viewing the Logs in Logging Analytics](https://docs.oracle.com/en/learn/o ## Metrics -The plugin emits following metrics in Prometheus format, which provides stats/insights about the data being collected and processed by the plugin. Refer [monitoring-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) for details on how to expose these and other various Fluentd metrics to Prometheus (*If the requirement is to collect and monitor core Fluentd and this plugin metrics alone using Prometheus then Step1 and Step2 from the referred document can be skipped*). +The plugin emits following metrics in Prometheus format, which provides stats/insights about the data being collected and processed by the plugin. +Refer [monitoring-prometheus](https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus) for details on how to expose these and other various Fluentd metrics to Prometheus (*If the requirement is to collect and monitor core Fluentd and this plugin metrics alone using Prometheus then Step1 and Step2 from the referred document can be skipped*). + +#### Note +For prometheus metrics to work properly, please add 'tag' and 'worker_id' (in case of multi worker configuration) to the filter block. + + tag ${tag} + worker_id ${ENV['SERVERENGINE_WORKER_ID']} +#### Metrics details Metric Name: oci_la_fluentd_output_plugin_records_received - labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] + labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] Description: Number of records received by the OCI Logging Analytics Fluentd output plugin. Type : Gauge Metric Name: oci_la_fluentd_output_plugin_records_valid - labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] + labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] Description: Number of valid records received by the OCI Logging Analytics Fluentd output plugin. Type : Gauge Metric Name: oci_la_fluentd_output_plugin_records_invalid - labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:reason] + labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:reason] Description: Number of invalid records received by the OCI Logging Analytics Fluentd output plugin. Type : Gauge Metric Name: oci_la_fluentd_output_plugin_records_post_error - labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:error_code, :reason] + labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:error_code, :reason] Description: Number of records failed posting to OCI Logging Analytics by the Fluentd output plugin. Type : Gauge Metric Name: oci_la_fluentd_output_plugin_records_post_success - labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] + labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set] Description: Number of records posted by the OCI Logging Analytics Fluentd output plugin. Type : Gauge Metric Name: oci_la_fluentd_output_plugin_chunk_time_to_receive - labels: [:tag] + labels: [:worker_id,:tag] Description: Average time taken by Fluentd to deliver the collected records from Input plugin to OCI Logging Analytics output plugin. Type : Histogram @@ -126,6 +151,10 @@ The plugin emits following metrics in Prometheus format, which provides stats/in Description: Average time taken for posting the received records to OCI Logging Analytics by the Fluentd output plugin. Type : Histogram +## FAQ + +See [FAQ](FAQ.md). + ## Changes diff --git a/THIRD_PARTY_LICENSES.txt b/THIRD_PARTY_LICENSES.txt index f04c4c9..8caf51c 100644 --- a/THIRD_PARTY_LICENSES.txt +++ b/THIRD_PARTY_LICENSES.txt @@ -4,6 +4,7 @@ - BSD 2-clause - Ruby License - UPL-1.0 +- MIT -------------------------------- Notices ---------------------------------------- ======================== Third Party Components ================================= @@ -25,12 +26,26 @@ oci * Source code: https://github.com/oracle/oci-java-sdk * Project home: https://docs.cloud.oracle.com/en-us/iaas/Content/API/SDKDocs/rubysdk.htm +prometheus-client +* Copyright 2013-2015 The Prometheus Authors +* License: Apache License 2.0 +* Source code: https://github.com/prometheus/client_ruby +* Project home: https://github.com/prometheus/client_ruby + +yajl-ruby +* Copyright 2013-2015 The Prometheus Authors +* License: MIT +* Source code: https://github.com/brianmario/yajl-ruby +* Project home: https://github.com/brianmario/yajl-ruby + =============================== Licenses ======================================== -------------------------- Apache License 2.0 ----------------------------------- Copyright 2011-2018 Fluentd Authors +Copyright 2013-2015 The Prometheus Authors + Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved. Apache License @@ -362,3 +377,29 @@ OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- + +------------------------------- MIT --------------------------------------------- + +The MIT License (MIT) + +Copyright (c) 2014 Brian Lopez + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +--------------------------------------------------------------------------------- diff --git a/examples/apache.conf b/examples/apache.conf index 94fd70e..756e386 100644 --- a/examples/apache.conf +++ b/examples/apache.conf @@ -1,61 +1,64 @@ # The following label is to ignore Fluentd warning events. - @type tail - @id in_tail_apacheError - path /var/log/apacheError.log - pos_file /var/log/fluentd-apacheError.log.pos - read_from_head true - path_key tailed_path - tag oci.apacheError - - @type none - + @type tail + @id in_tail_apacheError + path /var/log/apacheError.log + pos_file /var/log/fluentd-apacheError.log.pos + read_from_head true + path_key tailed_path + tag oci.apacheError + + @type none + + + +# Add below prometheus config block only when you need output plugin metrics. + + @type prometheus + bind 0.0.0.0 + port 24231 + metrics_path /metrics - @type record_transformer - enable_ruby true - - oci_la_global_metadata ${{: , :}} - oci_la_entity_id # If same across sources. Else keep this in individual filters - oci_la_entity_type # If same across sources. Else keep this in individual filters - + @type record_transformer + enable_ruby true + + oci_la_global_metadata ${{: , :}} + oci_la_entity_id # If same across sources. Else keep this in individual filters + oci_la_entity_type # If same across sources. Else keep this in individual filters + - @type record_transformer - enable_ruby true - - oci_la_metadata ${{: , :}} - oci_la_log_source_name - oci_la_log_group_id - oci_la_log_path "${record['tailed_path']}" - tag ${tag} - + @type record_transformer + enable_ruby true + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + - @type oci-logging-analytics + @type oci-logging-analytics namespace - -# Auth config file details + # Auth config file details config_file_location ~/.oci/config profile_name DEFAULT - -# Configuration for plugin (oci-logging-analytics) generated logs - plugin_log_location /var/log - plugin_log_level info -# Buffer Configuration + # Buffer Configuration @type file - Path /var/log + path /var/log retry_forever true disable_chunk_backup true diff --git a/examples/kafka.conf b/examples/kafka.conf index 51083d9..aeba2a9 100644 --- a/examples/kafka.conf +++ b/examples/kafka.conf @@ -1,62 +1,65 @@ # The following label is to ignore Fluentd warning events. - @type tail - @id in_tail_kafka - path /var/log/kafka.log - pos_file /var/log/fluentd-kafka.log.pos - read_from_head true - path_key tailed_path - tag oci.kafka - - @type json - + @type tail + @id in_tail_kafka + path /var/log/kafka.log + pos_file /var/log/fluentd-kafka.log.pos + read_from_head true + path_key tailed_path + tag oci.kafka + + @type json + + + +# Add below prometheus config block only when you need output plugin metrics. + + @type prometheus + bind 0.0.0.0 + port 24231 + metrics_path /metrics - @type record_transformer - enable_ruby true - - oci_la_global_metadata ${{: , :}} - oci_la_entity_id # If same across sources. Else keep this in individual filters - oci_la_entity_type # If same across sources. Else keep this in individual filters - + @type record_transformer + enable_ruby true + + oci_la_global_metadata ${{: , :}} + oci_la_entity_id # If same across sources. Else keep this in individual filters + oci_la_entity_type # If same across sources. Else keep this in individual filters + - @type record_transformer - enable_ruby true - - oci_la_metadata ${{: , :}} - oci_la_log_source_name - oci_la_log_group_id - oci_la_log_path "${record['tailed_path']}" - message ${record["log"]} # Will assign the 'log' key value from json wrapped message to 'message' field - tag ${tag} - + @type record_transformer + enable_ruby true + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + message ${record["log"]} # Will assign the 'log' key value from json wrapped message to 'message' field + tag ${tag} + - @type oci-logging-analytics + @type oci-logging-analytics namespace - -# Auth config file details + # Auth config file details config_file_location ~/.oci/config profile_name DEFAULT - -# Configuration for plugin (oci-logging-analytics) generated logs - plugin_log_location /var/log - plugin_log_level info -# Buffer Configuration + # Buffer Configuration @type file - Path /var/log + path /var/log retry_forever true disable_chunk_backup true diff --git a/examples/multi_worker.conf b/examples/multi_worker.conf new file mode 100644 index 0000000..613bca1 --- /dev/null +++ b/examples/multi_worker.conf @@ -0,0 +1,189 @@ +# Four worker set up. + + workers 4 + + +# Below prometheus source block will be applicable for all the workers with plugins which support multi-process workers feature. +# For these workers, corresponding port for prometheus will be original port provided ( in this example 24232) + worker_id. + + @type prometheus + bind 0.0.0.0 + port 24232 + aggregated_metrics_path /aggregated_metrics + + +# work on worker 0 HTTP + + + @type http + port 9880 + bind 0.0.0.0 + tag oci.apacheError + + @type none + + + + + @type record_transformer + enable_ruby + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + worker_id 0 + + + + + @type oci-logging-analytics + namespace + # Auth config file details + config_file_location ~/.oci/config + profile_name DEFAULT + # Buffer Configuration + + @type file + path /var/log + retry_forever true + disable_chunk_backup true + + + + +# work on worker 1 UDP + + + @type udp + tag oci.audit.log # required + port 20001 # optional. 5160 by default + bind 0.0.0.0 # optional. 0.0.0.0 by default + message_length_limit 1MB # optional. 4096 bytes by default + + @type none + + + + + @type record_transformer + enable_ruby + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + worker_id 1 + + + + + @type oci-logging-analytics + namespace + # Auth config file details + config_file_location ~/.oci/config + profile_name DEFAULT + # Buffer Configuration + + @type file + path /var/log + retry_forever true + disable_chunk_backup true + + + + +# work on worker 2 TAIL + + + @type tail + @id in_tail_apacheError + path /var/log/apacheError.log + pos_file /var/log/fluentd-apacheError.log.pos + read_from_head true + path_key tailed_path + tag oci.apacheError + + @type none + + + +# As @tail plugin does not support multi-process workers feature, we need to provide prometheus related source block along with exact port details, as shown below. + + @type prometheus + bind 0.0.0.0 + port 24234 + aggregated_metrics_path /aggregated_metrics + + + + @type record_transformer + enable_ruby + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + worker_id 0 + + + + + @type oci-logging-analytics + namespace + # Auth config file details + config_file_location ~/.oci/config + profile_name DEFAULT + # Buffer Configuration + + @type file + path /var/log + retry_forever true + disable_chunk_backup true + + + + +# work on worker 3 TCP with buffer as memory + + + @type tcp + tag oci.apache.kafka # required + port 5170 # optional. 5170 by default + bind 0.0.0.0 # optional. 0.0.0.0 by default + + @type none + + + + + @type record_transformer + enable_ruby + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + worker_id 0 + + + + + @type oci-logging-analytics + namespace + # Auth config file details + config_file_location ~/.oci/config + profile_name DEFAULT + # Buffer Configuration + + @type file + path /var/log + retry_forever true + disable_chunk_backup true + + + \ No newline at end of file diff --git a/examples/syslog.conf b/examples/syslog.conf index 6513fc2..1147f23 100644 --- a/examples/syslog.conf +++ b/examples/syslog.conf @@ -1,64 +1,67 @@ # The following label is to ignore Fluentd warning events. - @type tail - @id in_tail_syslog - multiline_flush_interval 5s - path /var/log/messages* - pos_file /var/log/messages*.log.pos - read_from_head true - path_key tailed_path - tag oci.syslog - - @type multiline - format_firstline /^\S+\s+\d{1,2}\s+\d{1,2}:\d{1,2}:\d{1,2}\s+/ - format1 /^(?.*)/ - + @type tail + @id in_tail_syslog + multiline_flush_interval 5s + path /var/log/messages* + pos_file /var/log/messages*.log.pos + read_from_head true + path_key tailed_path + tag oci.syslog + + @type multiline + format_firstline /^\S+\s+\d{1,2}\s+\d{1,2}:\d{1,2}:\d{1,2}\s+/ + format1 /^(?.*)/ + + + +# Add below prometheus config block only when you need output plugin metrics. + + @type prometheus + bind 0.0.0.0 + port 24231 + metrics_path /metrics - @type record_transformer - enable_ruby true - - oci_la_global_metadata ${{: , :}} - oci_la_entity_id # If same across sources. Else keep this in individual filters - oci_la_entity_type # If same across sources. Else keep this in individual filters - + @type record_transformer + enable_ruby true + + oci_la_global_metadata ${{: , :}} + oci_la_entity_id # If same across sources. Else keep this in individual filters + oci_la_entity_type # If same across sources. Else keep this in individual filters + - @type record_transformer - enable_ruby true - - oci_la_metadata ${{: , :}} - oci_la_log_source_name - oci_la_log_group_id - oci_la_log_path "${record['tailed_path']}" - tag ${tag} - + @type record_transformer + enable_ruby true + + oci_la_metadata ${{: , :}} + oci_la_log_source_name + oci_la_log_group_id + oci_la_log_path "${record['tailed_path']}" + tag ${tag} + - @type oci-logging-analytics + @type oci-logging-analytics namespace - -# Auth config file details + # Auth config file details config_file_location ~/.oci/config profile_name DEFAULT - -# Configuration for plugin (oci-logging-analytics) generated logs - plugin_log_location /var/log - plugin_log_level info -# Buffer Configuration + # Buffer Configuration @type file - Path /var/log + path /var/log retry_forever true disable_chunk_backup true diff --git a/fluent-plugin-oci-logging-analytics.gemspec b/fluent-plugin-oci-logging-analytics.gemspec index 9ad6551..a75d49a 100755 --- a/fluent-plugin-oci-logging-analytics.gemspec +++ b/fluent-plugin-oci-logging-analytics.gemspec @@ -6,7 +6,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |spec| spec.name = "fluent-plugin-oci-logging-analytics" - spec.version = "2.0.4" + spec.version = "2.0.5" spec.authors = ["Oracle","OCI Observability: Logging Analytics"] spec.email = ["oci_la_plugins_grp@oracle.com"] @@ -31,5 +31,6 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency 'rubyzip', '~> 2.3.2' spec.add_runtime_dependency "oci", "~>2.16" spec.add_runtime_dependency "prometheus-client", "~>4.0.0" + spec.add_runtime_dependency "yajl-ruby", '~> 1.4', '>= 1.4.3' end diff --git a/lib/fluent/metrics/metricsLabels.rb b/lib/fluent/metrics/metricsLabels.rb index d52c85b..4201a42 100644 --- a/lib/fluent/metrics/metricsLabels.rb +++ b/lib/fluent/metrics/metricsLabels.rb @@ -1,6 +1,7 @@ class MetricsLabels - attr_accessor :tag, :logGroupId, :logSourceName, :logSet, :invalid_reason, :records_valid, :records_per_tag, :latency + attr_accessor :worker_id, :tag, :logGroupId, :logSourceName, :logSet, :invalid_reason, :records_valid, :records_per_tag, :latency def initialize + @worker_id = nil @tag = nil @logGroupId = nil @logSourceName = nil diff --git a/lib/fluent/metrics/prometheusMetrics.rb b/lib/fluent/metrics/prometheusMetrics.rb index d8fc60d..3db7c23 100644 --- a/lib/fluent/metrics/prometheusMetrics.rb +++ b/lib/fluent/metrics/prometheusMetrics.rb @@ -14,16 +14,15 @@ def initialize end def createMetrics gauge = Prometheus::Client::Gauge - @records_received = gauge.new(:oci_la_fluentd_output_plugin_records_received, docstring: 'Number of records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) - @records_valid = gauge.new(:oci_la_fluentd_output_plugin_records_valid, docstring: 'Number of valid records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) - @records_invalid = gauge.new(:oci_la_fluentd_output_plugin_records_invalid, docstring: 'Number of invalid records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:reason]) - @records_error = gauge.new(:oci_la_fluentd_output_plugin_records_post_error, docstring: 'Number of records failed posting to OCI Logging Analytics by the Fluentd output plugin.', labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:error_code, :reason]) - @records_posted = gauge.new(:oci_la_fluentd_output_plugin_records_post_success, docstring: 'Number of records posted by the OCI Logging Analytics Fluentd output plugin.', labels: [:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) - #@bytes_received = gauge.new(:oci_la_bytes_received, docstring: '...', labels: [:tag]) - #@bytes_posted = gauge.new(:oci_la_bytes_posted, docstring: '...', labels: [:oci_la_log_group_id]) + @records_received = gauge.new(:oci_la_fluentd_output_plugin_records_received, docstring: 'Number of records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) + @records_valid = gauge.new(:oci_la_fluentd_output_plugin_records_valid, docstring: 'Number of valid records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) + @records_invalid = gauge.new(:oci_la_fluentd_output_plugin_records_invalid, docstring: 'Number of invalid records received by the OCI Logging Analytics Fluentd output plugin.', labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:reason]) + @records_error = gauge.new(:oci_la_fluentd_output_plugin_records_post_error, docstring: 'Number of records failed posting to OCI Logging Analytics by the Fluentd output plugin.', labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set,:error_code, :reason]) + @records_posted = gauge.new(:oci_la_fluentd_output_plugin_records_post_success, docstring: 'Number of records posted by the OCI Logging Analytics Fluentd output plugin.', labels: [:worker_id,:tag,:oci_la_log_group_id,:oci_la_log_source_name,:oci_la_log_set]) + histogram = Prometheus::Client::Histogram - @chunk_time_to_receive = histogram.new(:oci_la_fluentd_output_plugin_chunk_time_to_receive, docstring: 'Average time taken by Fluentd to deliver the collected records from Input plugin to OCI Logging Analytics output plugin.', labels: [:tag]) - @chunk_time_to_upload = histogram.new(:oci_la_fluentd_output_plugin_chunk_time_to_post, docstring: 'Average time taken for posting the received records to OCI Logging Analytics by the Fluentd output plugin.', labels: [:oci_la_log_group_id]) + @chunk_time_to_receive = histogram.new(:oci_la_fluentd_output_plugin_chunk_time_to_receive, docstring: 'Average time taken by Fluentd to deliver the collected records from Input plugin to OCI Logging Analytics output plugin.', labels: [:worker_id,:tag]) + @chunk_time_to_upload = histogram.new(:oci_la_fluentd_output_plugin_chunk_time_to_post, docstring: 'Average time taken for posting the received records to OCI Logging Analytics by the Fluentd output plugin.', labels: [:worker_id,:oci_la_log_group_id]) end def registerMetrics @@ -33,8 +32,6 @@ def registerMetrics registry.register(@records_invalid) unless registry.exist?('oci_la_fluentd_output_plugin_records_invalid') registry.register(@records_error) unless registry.exist?('oci_la_fluentd_output_plugin_records_post_error') registry.register(@records_posted) unless registry.exist?('oci_la_fluentd_output_plugin_records_post_success') - #registry.register(@bytes_received) unless registry.exist?('oci_la_bytes_received') - #registry.register(@bytes_posted) unless registry.exist?('oci_la_bytes_valid') registry.register(@chunk_time_to_receive) unless registry.exist?('oci_la_fluentd_output_plugin_chunk_time_to_receive') registry.register(@chunk_time_to_upload) unless registry.exist?('oci_la_fluentd_output_plugin_chunk_time_to_post') end diff --git a/lib/fluent/plugin/out_oci-logging-analytics.rb b/lib/fluent/plugin/out_oci-logging-analytics.rb index cdfcc27..a0e4ed7 100755 --- a/lib/fluent/plugin/out_oci-logging-analytics.rb +++ b/lib/fluent/plugin/out_oci-logging-analytics.rb @@ -4,6 +4,8 @@ require 'fluent/plugin/output' require "benchmark" require 'zip' +require 'yajl' +require 'yajl/json_gem' require 'logger' require_relative '../dto/logEventsJson' @@ -75,6 +77,8 @@ class OutOracleOCILogAnalytics < Output @@loganalytics_client = nil @@prometheusMetrics = nil @@logger_config_errors = [] + @@worker_id = '0' + @@encoded_messages_count = 0 desc 'OCI Tenancy Namespace.' @@ -92,7 +96,7 @@ class OutOracleOCILogAnalytics < Output desc 'Payload zip File Location.' config_param :zip_file_location, :string, :default => nil desc 'The kubernetes_metadata_keys_mapping.' - config_param :kubernetes_metadata_keys_mapping, :hash, :default => {"container_name":"Kubernetes Container Name","namespace_name":"Kubernetes Namespace Name","pod_name":"Kubernetes Pod Name","container_image":"Kubernetes Container Image","host":"Kubernetes Node Name","master_url":"Kubernetes Master Url"} + config_param :kubernetes_metadata_keys_mapping, :hash, :default => {"container_name":"Container","namespace_name":"Namespace","pod_name":"Pod","container_image":"Container Image Name","host":"Node"} #**************************************************************** @@ -146,7 +150,7 @@ class OutOracleOCILogAnalytics < Output desc 'The number of threads of output plugins, which is used to write chunks in parallel.' config_set_default :flush_thread_count, 1 desc 'The max size of each chunks: events will be written into chunks until the size of chunks become this size.' - config_set_default :chunk_limit_size, 2 * 1024 * 1024 # 2MB + config_set_default :chunk_limit_size, 4 * 1024 * 1024 # 4MB desc 'The size limitation of this buffer plugin instance.' config_set_default :total_limit_size, 5 * (1024**3) # 5GB desc 'Flush interval' @@ -170,67 +174,72 @@ def initialize end def initialize_logger() - filename = nil - is_default_log_location = false - if is_valid(@plugin_log_location) - filename = @plugin_log_location[-1] == '/' ? @plugin_log_location : @plugin_log_location +'/' - else - is_default_log_location = true - end - if !is_valid_log_level(@plugin_log_level) - @plugin_log_level = @@default_log_level - end - oci_fluent_output_plugin_log = nil - if is_default_log_location - oci_fluent_output_plugin_log = 'oci-logging-analytics.log' - else - oci_fluent_output_plugin_log = filename+'oci-logging-analytics.log' - end - - logger_config = nil - - if is_valid_number_of_logs(@plugin_log_file_count) && is_valid_log_size(@plugin_log_file_size) - # When customer provided valid log_file_count and log_file_size. - # logger will rotate with max log_file_count with each file having max log_file_size. - # Older logs purged automatically. - @@logger = Logger.new(oci_fluent_output_plugin_log, @plugin_log_file_count, @@validated_log_size) - logger_config = 'USER_CONFIG' - elsif is_valid_log_rotation(@plugin_log_rotation) - # When customer provided only log_rotation. - # logger will create a new log based on log_rotation (new file everyday if the rotation is daily). - # This will create too many logs over a period of time as log purging is not done. - @@logger = Logger.new(oci_fluent_output_plugin_log, @plugin_log_rotation) - logger_config = 'FALLBACK_CONFIG' - else - # When customer provided invalid log config, default config is considered. - # logger will rotate with max default log_file_count with each file having max default log_file_size. - # Older logs purged automatically. - @@logger = Logger.new(oci_fluent_output_plugin_log, @@default_number_of_logs, @@default_log_size) - logger_config = 'DEFAULT_CONFIG' - end - - logger_set_level(@plugin_log_level) + begin + filename = nil + is_default_log_location = false + if is_valid(@plugin_log_location) + filename = @plugin_log_location[-1] == '/' ? @plugin_log_location : @plugin_log_location +'/' + else + @@logger = log + return + end + if !is_valid_log_level(@plugin_log_level) + @plugin_log_level = @@default_log_level + end + oci_fluent_output_plugin_log = nil + if is_default_log_location + oci_fluent_output_plugin_log = 'oci-logging-analytics.log' + else + oci_fluent_output_plugin_log = filename+'oci-logging-analytics.log' + end + logger_config = nil + + if is_valid_number_of_logs(@plugin_log_file_count) && is_valid_log_size(@plugin_log_file_size) + # When customer provided valid log_file_count and log_file_size. + # logger will rotate with max log_file_count with each file having max log_file_size. + # Older logs purged automatically. + @@logger = Logger.new(oci_fluent_output_plugin_log, @plugin_log_file_count, @@validated_log_size) + logger_config = 'USER_CONFIG' + elsif is_valid_log_rotation(@plugin_log_rotation) + # When customer provided only log_rotation. + # logger will create a new log based on log_rotation (new file everyday if the rotation is daily). + # This will create too many logs over a period of time as log purging is not done. + @@logger = Logger.new(oci_fluent_output_plugin_log, @plugin_log_rotation) + logger_config = 'FALLBACK_CONFIG' + else + # When customer provided invalid log config, default config is considered. + # logger will rotate with max default log_file_count with each file having max default log_file_size. + # Older logs purged automatically. + @@logger = Logger.new(oci_fluent_output_plugin_log, @@default_number_of_logs, @@default_log_size) + logger_config = 'DEFAULT_CONFIG' + end - @@logger.info {"Initializing oci-logging-analytics plugin"} - if is_default_log_location - @@logger.info {"plugin_log_location is not specified. oci-logging-analytics.log will be generated under directory from where fluentd is executed."} - end + logger_set_level(@plugin_log_level) + @@logger.info {"Initializing oci-logging-analytics plugin"} + if is_default_log_location + @@logger.info {"plugin_log_location is not specified. oci-logging-analytics.log will be generated under directory from where fluentd is executed."} + end - case logger_config - when 'USER_CONFIG' - @@logger.info {"Logger for oci-logging-analytics.log is initialized with config values log size: #{@plugin_log_file_size}, number of logs: #{@plugin_log_file_count}"} - when 'FALLBACK_CONFIG' - @@logger.info {"Logger for oci-logging-analytics.log is initialized with log rotation: #{@plugin_log_rotation}"} - when 'DEFAULT_CONFIG' - @@logger.info {"Logger for oci-logging-analytics.log is initialized with default config values log size: #{@@default_log_size}, number of logs: #{@@default_number_of_logs}"} - end - if @@logger_config_errors.length > 0 - @@logger_config_errors. each {|logger_config_error| - @@logger.warn {"#{logger_config_error}"} - } - end - if is_valid_log_age(@plugin_log_age) - @@logger.warn {"'plugin_log_age' field is deprecated. Use 'plugin_log_file_size' and 'plugin_log_file_count' instead."} + case logger_config + when 'USER_CONFIG' + @@logger.info {"Logger for oci-logging-analytics.log is initialized with config values log size: #{@plugin_log_file_size}, number of logs: #{@plugin_log_file_count}"} + when 'FALLBACK_CONFIG' + @@logger.info {"Logger for oci-logging-analytics.log is initialized with log rotation: #{@plugin_log_rotation}"} + when 'DEFAULT_CONFIG' + @@logger.info {"Logger for oci-logging-analytics.log is initialized with default config values log size: #{@@default_log_size}, number of logs: #{@@default_number_of_logs}"} + end + if @@logger_config_errors.length > 0 + @@logger_config_errors. each {|logger_config_error| + @@logger.warn {"#{logger_config_error}"} + } + end + if is_valid_log_age(@plugin_log_age) + @@logger.warn {"'plugin_log_age' field is deprecated. Use 'plugin_log_file_size' and 'plugin_log_file_count' instead."} + end + rescue => ex + @@logger = log + @@logger.error {"Error while initializing logger:#{ex.inspect}"} + @@logger.info {"Redirecting oci logging analytics logs to STDOUT"} end end @@ -275,8 +284,8 @@ def configure(conf) initialize_logger initialize_loganalytics_client - @@logger.error {"Error in config file : Buffer plugin must be of @type file."} unless buffer_config['@type'] == 'file' - raise Fluent::ConfigError, "Error in config file : Buffer plugin must be of @type file." unless buffer_config['@type'] == 'file' + #@@logger.error {"Error in config file : Buffer plugin must be of @type file."} unless buffer_config['@type'] == 'file' + #raise Fluent::ConfigError, "Error in config file : Buffer plugin must be of @type file." unless buffer_config['@type'] == 'file' is_mandatory_fields_valid,invalid_field_name = mandatory_field_validator if !is_mandatory_fields_valid @@ -288,8 +297,8 @@ def configure(conf) unless conf.elements(name: 'buffer').empty? buffer_conf = conf.elements(name: 'buffer').first chunk_limit_size_from_conf = buffer_conf['chunk_limit_size'] - unless chunk_limit_size_from_conf.nil? - log.debug "chunk limit size as per the configuration file is #{chunk_limit_size_from_conf}" + unless chunk_limit_size_from_conf.nil? && buffer_config['@type'] != 'file' + @@logger.debug "chunk limit size as per the configuration file is #{chunk_limit_size_from_conf}" case chunk_limit_size_from_conf.to_s when /([0-9]+)k/i chunk_limit_size_bytes = $~[1].to_i * 1024 @@ -299,13 +308,13 @@ def configure(conf) chunk_limit_size_bytes = $~[1].to_i * (1024 ** 3) when /([0-9]+)t/i chunk_limit_size_bytes = $~[1].to_i * (1024 ** 4) - else - raise Fluent::ConfigError, "error parsing chunk_limit_size" + #else + #raise Fluent::ConfigError, "error parsing chunk_limit_size" end - log.debug "chunk limit size in bytes as per the configuration file is #{chunk_limit_size_bytes}" - if !chunk_limit_size_bytes.between?(1048576, 2097152) - raise Fluent::ConfigError, "chunk_limit_size must be between 1MB and 2MB" + @@logger.debug "chunk limit size in bytes as per the configuration file is #{chunk_limit_size_bytes}" + if chunk_limit_size_bytes != nil && !chunk_limit_size_bytes.between?(1048576, 4194304) + raise Fluent::ConfigError, "chunk_limit_size must be between 1MB and 4MB" end end end @@ -577,7 +586,7 @@ def get_kubernetes_metadata(oci_la_metadata,record) kubernetes_metadata.each do |key, value| if kubernetes_metadata_keys_mapping.has_key?(key) if !is_valid(oci_la_metadata[kubernetes_metadata_keys_mapping[key]]) - oci_la_metadata[kubernetes_metadata_keys_mapping[key]] = json_message_handler(value) + oci_la_metadata[kubernetes_metadata_keys_mapping[key]] = json_message_handler(key, value) end end end @@ -588,14 +597,21 @@ def get_kubernetes_metadata(oci_la_metadata,record) return oci_la_metadata end - def json_message_handler(message) - if message.is_a?(Hash) - return JSON.generate(message) - else - return message - end - rescue => ex - return message + def json_message_handler(key, message) + begin + if !is_valid(message) + return nil + end + if message.is_a?(Hash) + return Yajl.dump(message) #JSON.generate(message) + end + return message + rescue => ex + @@logger.error {"Error occured while generating json for + field: #{key} + exception : #{ex}"} + return nil + end end def group_by_logGroupId(chunk) @@ -624,6 +640,10 @@ def group_by_logGroupId(chunk) if !record.nil? begin record_hash = record.keys.map {|x| [x,true]}.to_h + if record_hash.has_key?("worker_id") && is_valid(record["worker_id"]) + metricsLabels.worker_id = record["worker_id"]||= '0' + @@worker_id = record["worker_id"]||= '0' + end is_tag_exists = false if record_hash.has_key?("tag") && is_valid(record["tag"]) is_tag_exists = true @@ -702,22 +722,23 @@ def group_by_logGroupId(chunk) if record["oci_la_log_set"] != nil metricsLabels.logSet = record["oci_la_log_set"] end + record["message"] = json_message_handler("message", record["message"]) + + #This will check for null or empty messages and only that record will be ignored. if !is_valid(record["message"]) metricsLabels.invalid_reason = OutOracleOCILogAnalytics::METRICS_INVALID_REASON_MESSAGE if is_tag_exists - @@logger.warn {"'message' field has empty value, Skipping records associated with tag : #{record["tag"]}."} if invalid_records_per_tag.has_key?(record["tag"]) invalid_records_per_tag[record["tag"]] += 1 else invalid_records_per_tag[record["tag"]] = 1 + @@logger.warn {"'message' field is empty or encoded, Skipping records associated with tag : #{record["tag"]}."} end else - @@logger.warn {"'message' field has empty value, Skipping record."} + @@logger.warn {"'message' field is empty or encoded, Skipping record."} end next - else - record["message"] = json_message_handler(record["message"]) end if record_hash.has_key?("kubernetes") @@ -773,7 +794,7 @@ def group_by_logGroupId(chunk) tag_metrics_set.each do |tag,metricsLabels| latency_avg = (metricsLabels.latency / metricsLabels.records_per_tag).round(3) - @@prometheusMetrics.chunk_time_to_receive.observe(latency_avg, labels: { tag: tag}) + @@prometheusMetrics.chunk_time_to_receive.observe(latency_avg, labels: { worker_id: metricsLabels.worker_id, tag: tag}) end lrpes_for_logGroupId = {} @@ -827,17 +848,20 @@ def write(chunk) logGroup_metrics_map[metricsLabels.logGroupId] = metricsLabels_array end - @@prometheusMetrics.records_received.set(value.to_i, labels: { tag: key, + @@prometheusMetrics.records_received.set(value.to_i, labels: { worker_id: metricsLabels.worker_id, + tag: key, oci_la_log_group_id: metricsLabels.logGroupId, oci_la_log_source_name: metricsLabels.logSourceName, oci_la_log_set: metricsLabels.logSet}) - @@prometheusMetrics.records_invalid.set(dropped_messages, labels: { tag: key, + @@prometheusMetrics.records_invalid.set(dropped_messages, labels: { worker_id: metricsLabels.worker_id, + tag: key, oci_la_log_group_id: metricsLabels.logGroupId, oci_la_log_source_name: metricsLabels.logSourceName, oci_la_log_set: metricsLabels.logSet, reason: metricsLabels.invalid_reason}) - @@prometheusMetrics.records_valid.set(valid_messages, labels: { tag: key, + @@prometheusMetrics.records_valid.set(valid_messages, labels: { worker_id: metricsLabels.worker_id, + tag: key, oci_la_log_group_id: metricsLabels.logGroupId, oci_la_log_source_name: metricsLabels.logSourceName, oci_la_log_set: metricsLabels.logSet}) @@ -875,7 +899,7 @@ def write(chunk) end end }.real.round(3) - @@prometheusMetrics.chunk_time_to_upload.observe(chunk_upload_time_taken, labels: { oci_la_log_group_id: oci_la_log_group_id}) + @@prometheusMetrics.chunk_time_to_upload.observe(chunk_upload_time_taken, labels: { worker_id: @@worker_id, oci_la_log_group_id: oci_la_log_group_id}) end ensure @@ -956,7 +980,7 @@ def get_zipped_stream(oci_la_log_group_id,oci_la_global_metadata,records_per_log @@logger.debug {"Added entry #{nextEntry} for oci_la_log_set #{oci_la_log_set} into the zip."} zos.put_next_entry(nextEntry) logEventsJsonFinal = LogEventsJson.new(oci_la_global_metadata,lrpes_for_logEvents) - zos.write logEventsJsonFinal.to_hash.to_json + zos.write Yajl.dump(logEventsJsonFinal.to_hash) end } zippedstream.rewind @@ -1002,13 +1026,15 @@ def upload_to_oci(oci_la_log_group_id, number_of_records, zippedstream, metricsL opts) if !response.nil? && response.status == 200 then headers = response.headers - - metricsLabels_array.each { |metricsLabels| - @@prometheusMetrics.records_posted.set(metricsLabels.records_valid, labels: { tag: metricsLabels.tag, - oci_la_log_group_id: metricsLabels.logGroupId, - oci_la_log_source_name: metricsLabels.logSourceName, - oci_la_log_set: metricsLabels.logSet}) - } + if metricsLabels_array != nil + metricsLabels_array.each { |metricsLabels| + @@prometheusMetrics.records_posted.set(metricsLabels.records_valid, labels: { worker_id: metricsLabels.worker_id, + tag: metricsLabels.tag, + oci_la_log_group_id: metricsLabels.logGroupId, + oci_la_log_source_name: metricsLabels.logSourceName, + oci_la_log_set: metricsLabels.logSet}) + } + end #zippedstream.rewind #reposition buffer pointer to the beginning #zipfile = zippedstream&.sysread&.dup @@ -1087,9 +1113,10 @@ def upload_to_oci(oci_la_log_group_id, number_of_records, zippedstream, metricsL error_reason = ex @@logger.error {"oci upload exception : Error while uploading the payload. #{ex}"} ensure - if error_reason != nil + if error_reason != nil && metricsLabels_array != nil metricsLabels_array.each { |metricsLabels| - @@prometheusMetrics.records_error.set(metricsLabels.records_valid, labels: { tag: metricsLabels.tag, + @@prometheusMetrics.records_error.set(metricsLabels.records_valid, labels: {worker_id: metricsLabels.worker_id, + tag: metricsLabels.tag, oci_la_log_group_id: metricsLabels.logGroupId, oci_la_log_source_name: metricsLabels.logSourceName, oci_la_log_set: metricsLabels.logSet,