From edf4fc993a0a9bd88f9c1b8e402d1e14dd742bee Mon Sep 17 00:00:00 2001 From: Oleksandr Kuzhel Date: Wed, 22 Sep 2021 10:01:15 +0200 Subject: [PATCH] adding frequency to all metric data and profile if it is set in conf --- .../manager/data/event_builder.py | 10 ++- .../manager/data/inventory_record.py | 20 ++++- .../manager/hec_sender.py | 35 ++++++-- .../manager/poller.py | 2 +- .../manager/poller_utilities.py | 2 +- .../manager/task_utilities.py | 30 +++++++ .../manager/tasks.py | 11 ++- tests/data/test_event_builder.py | 25 ++++++ tests/data/test_inventory_record.py | 87 +++++++++++++++++++ 9 files changed, 209 insertions(+), 13 deletions(-) create mode 100644 tests/data/test_inventory_record.py diff --git a/splunk_connect_for_snmp_poller/manager/data/event_builder.py b/splunk_connect_for_snmp_poller/manager/data/event_builder.py index c4bb1cf..5c6e7ab 100644 --- a/splunk_connect_for_snmp_poller/manager/data/event_builder.py +++ b/splunk_connect_for_snmp_poller/manager/data/event_builder.py @@ -19,6 +19,11 @@ logger = get_logger(__name__) +""" +You can check how the events for HEC should be formatted here +https://docs.splunk.com/Documentation/Splunk/8.2.2/Data/FormateventsforHTTPEventCollector +""" + class EventBuilder: def __init__(self) -> None: @@ -27,6 +32,9 @@ def __init__(self) -> None: def add(self, field, part: Any) -> None: self.data[field.value] = part + def add_fields(self, part: Dict) -> None: + self.data[EventField.FIELDS.value] = part + def is_one_time_walk(self, one_time_flag: bool) -> None: if one_time_flag: self.data[EventField.SOURCETYPE.value] = EventType.WALK.value @@ -42,7 +50,7 @@ class EventField(Enum): HOST = "host" INDEX = "index" EVENT = "event" - FREQUENCY = "frequency" + FREQUENCY = "freqinseconds" PROFILE = "profile" FIELDS = "fields" diff --git a/splunk_connect_for_snmp_poller/manager/data/inventory_record.py b/splunk_connect_for_snmp_poller/manager/data/inventory_record.py index 3f77b70..68617cc 100644 --- a/splunk_connect_for_snmp_poller/manager/data/inventory_record.py +++ b/splunk_connect_for_snmp_poller/manager/data/inventory_record.py @@ -15,6 +15,8 @@ import json from dataclasses import dataclass +from splunk_connect_for_snmp_poller.manager.data.event_builder import EventField + @dataclass class InventoryRecord: @@ -24,5 +26,21 @@ class InventoryRecord: profile: str frequency_str: str - def toJson(self): + def to_json(self) -> str: return json.dumps(self, default=lambda o: o.__dict__) + + def extend_dict_with_provided_data( + self, fields: dict, additional_metric_fields: list + ) -> dict: + if ( + additional_metric_fields + and EventField.PROFILE.value in additional_metric_fields + ): + fields[EventField.PROFILE.value] = self.profile + + return fields + + @staticmethod + def from_json(ir_json: str): + ir_dict = json.loads(ir_json) + return InventoryRecord(**ir_dict) diff --git a/splunk_connect_for_snmp_poller/manager/hec_sender.py b/splunk_connect_for_snmp_poller/manager/hec_sender.py index 5b2dc87..91aa7bd 100644 --- a/splunk_connect_for_snmp_poller/manager/hec_sender.py +++ b/splunk_connect_for_snmp_poller/manager/hec_sender.py @@ -24,6 +24,7 @@ EventField, EventType, ) +from splunk_connect_for_snmp_poller.manager.data.inventory_record import InventoryRecord from splunk_connect_for_snmp_poller.manager.static.mib_enricher import MibEnricher logger = get_logger(__name__) @@ -36,6 +37,8 @@ def post_data_to_splunk_hec( variables_binds, is_metric, index, + ir, + additional_metric_fields, one_time_flag=False, mib_enricher=None, ): @@ -44,7 +47,13 @@ def post_data_to_splunk_hec( if is_metric: logger.debug(f"+++++++++metric index: {index['metric_index']} +++++++++") post_metric_data( - metrics_endpoint, host, variables_binds, index["metric_index"], mib_enricher + metrics_endpoint, + host, + variables_binds, + index["metric_index"], + ir, + additional_metric_fields, + mib_enricher, ) else: logger.debug(f"*********event index: {index['event_index']} ********") @@ -86,9 +95,9 @@ def post_event_data( ) -def init_builder_with_common_data(time, host, index) -> EventBuilder: +def init_builder_with_common_data(current_time, host, index) -> EventBuilder: builder = EventBuilder() - builder.add(EventField.TIME, time) + builder.add(EventField.TIME, current_time) builder.add(EventField.HOST, host) builder.add(EventField.INDEX, index) return builder @@ -135,17 +144,31 @@ def _enrich_event_data(mib_enricher: MibEnricher, variables_binds: dict) -> str: return non_metric_result -def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None): +def post_metric_data( + endpoint, + host, + variables_binds, + index, + ir: InventoryRecord, + additional_metric_fields, + mib_enricher=None, +): json_val = json.loads(variables_binds) metric_name = json_val["metric_name"] metric_value = json_val["_value"] - fields = {"metric_name:" + metric_name: metric_value} + fields = { + "metric_name:" + metric_name: metric_value, + EventField.FREQUENCY.value: ir.frequency_str, + } if mib_enricher: _enrich_metric_data(mib_enricher, json_val, fields) + if additional_metric_fields: + fields = ir.extend_dict_with_provided_data(fields, additional_metric_fields) + builder = init_builder_with_common_data(time.time(), host, index) builder.add(EventField.EVENT, EventType.METRIC.value) - builder.add(EventField.FIELDS, fields) + builder.add_fields(fields) data = builder.build() diff --git a/splunk_connect_for_snmp_poller/manager/poller.py b/splunk_connect_for_snmp_poller/manager/poller.py index d5d1e06..3035ca8 100644 --- a/splunk_connect_for_snmp_poller/manager/poller.py +++ b/splunk_connect_for_snmp_poller/manager/poller.py @@ -182,4 +182,4 @@ def __start_realtime_scheduler_task(self): def scheduled_task(ir: InventoryRecord, server_config, splunk_indexes): logger.debug("Executing scheduled_task for %s", ir.__repr__()) - snmp_polling.delay(ir.toJson(), server_config, splunk_indexes) + snmp_polling.delay(ir.to_json(), server_config, splunk_indexes) diff --git a/splunk_connect_for_snmp_poller/manager/poller_utilities.py b/splunk_connect_for_snmp_poller/manager/poller_utilities.py index 8a54f66..76fdc91 100644 --- a/splunk_connect_for_snmp_poller/manager/poller_utilities.py +++ b/splunk_connect_for_snmp_poller/manager/poller_utilities.py @@ -50,7 +50,7 @@ def onetime_task(inventory_record: InventoryRecord, server_config, splunk_indexe logger.debug("Executing onetime_task for %s", inventory_record.__repr__()) snmp_polling.delay( - inventory_record.toJson(), + inventory_record.to_json(), server_config, splunk_indexes, one_time_flag=True, diff --git a/splunk_connect_for_snmp_poller/manager/task_utilities.py b/splunk_connect_for_snmp_poller/manager/task_utilities.py index e772240..8103224 100644 --- a/splunk_connect_for_snmp_poller/manager/task_utilities.py +++ b/splunk_connect_for_snmp_poller/manager/task_utilities.py @@ -213,6 +213,8 @@ async def snmp_get_handler( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, var_binds, ): """ @@ -244,6 +246,8 @@ async def snmp_get_handler( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, mib_enricher, ) @@ -297,6 +301,8 @@ def _any_walk_failure_happened( otel_metrics_url, one_time_flag, is_metric, + ir, + additional_metric_fields, varBinds, ): if errorIndication: @@ -309,6 +315,8 @@ def _any_walk_failure_happened( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, ) return True @@ -324,6 +332,8 @@ def _any_walk_failure_happened( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, ) return True @@ -344,6 +354,8 @@ async def snmp_bulk_handler( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, var_binds, ): """ @@ -379,6 +391,8 @@ async def snmp_bulk_handler( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, mib_enricher, ) @@ -396,6 +410,8 @@ async def walk_handler( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, ): """ Perform the SNMP Walk for oid end with *, @@ -422,6 +438,8 @@ async def walk_handler( otel_metrics_url, one_time_flag, is_metric, + ir, + additional_metric_fields, varBinds, ): break @@ -434,6 +452,8 @@ async def walk_handler( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, ) @@ -452,6 +472,8 @@ async def walk_handler_with_enricher( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, ): """ Perform the SNMP Walk for oid end with *, @@ -478,6 +500,8 @@ async def walk_handler_with_enricher( index, otel_logs_url, otel_metrics_url, + ir, + additional_metric_fields, one_time_flag, is_metric, varBinds, @@ -511,6 +535,8 @@ async def walk_handler_with_enricher( otel_metrics_url, index, one_time_flag, + ir, + additional_metric_fields, mib_enricher, ] _post_walk_data_to_splunk( @@ -579,6 +605,8 @@ def _post_walk_data_to_splunk( otel_metrics_url, index, one_time_flag, + ir, + additional_metric_fields, mib_enricher, ): for result in result_list: @@ -589,6 +617,8 @@ def _post_walk_data_to_splunk( result, is_metric, index, + ir, + additional_metric_fields, one_time_flag, mib_enricher, ) diff --git a/splunk_connect_for_snmp_poller/manager/tasks.py b/splunk_connect_for_snmp_poller/manager/tasks.py index bd07e2c..c2b4dfd 100644 --- a/splunk_connect_for_snmp_poller/manager/tasks.py +++ b/splunk_connect_for_snmp_poller/manager/tasks.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import json import os import threading @@ -64,6 +63,8 @@ async def get_snmp_data( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, ): if var_binds: try: @@ -80,6 +81,8 @@ async def get_snmp_data( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, var_binds, ) except Exception as e: @@ -117,8 +120,7 @@ def sort_varbinds(varbind_list: list) -> VarbindCollection: # TODO remove the debugging statement later @app.task def snmp_polling(ir_json: str, server_config, index, one_time_flag=False): - ir_dict = json.loads(ir_json) - ir = InventoryRecord(**ir_dict) + ir = InventoryRecord.from_json(ir_json) async_to_sync(snmp_polling_async)(ir, server_config, index, one_time_flag) @@ -146,6 +148,7 @@ async def snmp_polling_async( logger.debug("==========context_data=========\n%s", context_data) mongo_connection = WalkedHostsRepository(server_config["mongo"]) + additional_metric_fields = server_config.get("additionalMetricField") enricher_presence = "enricher" in server_config static_parameters = [ snmp_engine, @@ -158,6 +161,8 @@ async def snmp_polling_async( otel_logs_url, otel_metrics_url, one_time_flag, + ir, + additional_metric_fields, ] get_bulk_specific_parameters = [mongo_connection, enricher_presence] diff --git a/tests/data/test_event_builder.py b/tests/data/test_event_builder.py index 8bcb3a8..cf8c82b 100644 --- a/tests/data/test_event_builder.py +++ b/tests/data/test_event_builder.py @@ -72,3 +72,28 @@ def test_event_build_for_walk_event(self): builder.is_one_time_walk(True) self.assertDictEqual(expected, builder.build()) + + def test_event_build_with_added_fields(self): + """ + Test that checks if EventBuilder correctly builds dict with fields + """ + timer = time.time() + some_field_ = {"some": "field"} + expected = { + "time": timer, + "sourcetype": "sc4snmp:meta", + "host": "ourhost", + "index": "meta_index", + "event": "binds", + "fields": some_field_, + } + + builder = EventBuilder() + builder.add(EventField.TIME, timer) + builder.add(EventField.HOST, "ourhost") + builder.add(EventField.INDEX, "meta_index") + builder.add(EventField.SOURCETYPE, "sc4snmp:meta") + builder.add(EventField.EVENT, "binds") + builder.add_fields(some_field_) + + self.assertDictEqual(expected, builder.build()) diff --git a/tests/data/test_inventory_record.py b/tests/data/test_inventory_record.py new file mode 100644 index 0000000..736060e --- /dev/null +++ b/tests/data/test_inventory_record.py @@ -0,0 +1,87 @@ +# +# Copyright 2021 Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +from unittest import TestCase + +from splunk_connect_for_snmp_poller.manager.data.inventory_record import InventoryRecord + +logger = logging.getLogger(__name__) + + +class TestInventoryRecord(TestCase): + ir = InventoryRecord( + "test_host", "test_version", "test_public", "test_profile", "10" + ) + + def test_inventory_record_to_json(self): + """ + Test that checks if InventoryRecord correctly parses to json + """ + ir = InventoryRecord( + "test_host", "test_version", "test_public", "test_profile", "10" + ) + expected_str = ( + '{"host": "test_host", "version": "test_version", ' + '"community": "test_public", "profile": "test_profile",' + ' "frequency_str": "10"}' + ) + + ir_to_json = ir.to_json() + + self.assertEqual(expected_str, ir_to_json) + + def test_json_to_inventory(self): + """ + Test that checks if json correctly loads to InventoryRecord + """ + ir = InventoryRecord( + "test_host", "test_version", "test_public", "test_profile", "10" + ) + ir_to_json = ir.to_json() + + record_from_json = InventoryRecord.from_json(ir_to_json) + + self.assertEqual(ir, record_from_json) + + def test_extending_dict_with_config_data(self): + """ + Test that checks if fields are correctly extended with profile data + """ + ir = InventoryRecord( + "test_host", "test_version", "test_public", "test_profile", "10" + ) + fields = {"key1": "value"} + expected_fields = {"key1": "value", "profile": "test_profile"} + additional_fields = ["profile"] + + extended_fields = ir.extend_dict_with_provided_data(fields, additional_fields) + + self.assertDictEqual(expected_fields, extended_fields) + + def test_extending_dict_with_empty_data(self): + """ + Test that checks if fields are not changed if additional fields are empty + """ + ir = InventoryRecord( + "test_host", "test_version", "test_public", "test_profile", "10" + ) + fields = {"key1": "value"} + additional_fields = None + + extended_fields = ir.extend_dict_with_provided_data(fields, additional_fields) + + self.assertDictEqual(fields, extended_fields)