From d4f494d8165aee75e09de968e8aa695c31dc4ff5 Mon Sep 17 00:00:00 2001 From: omrozowicz-splunk Date: Tue, 17 Aug 2021 16:59:43 +0200 Subject: [PATCH] feat: add integration for bulk and get operations --- .../manager/hec_sender.py | 72 +++++++++++++------ .../manager/realtime/interface_mib.py | 1 + .../manager/static/mib_enricher.py | 2 + .../manager/task_utilities.py | 43 ++++++++--- .../manager/tasks.py | 4 +- splunk_connect_for_snmp_poller/mongo.py | 9 ++- 6 files changed, 93 insertions(+), 38 deletions(-) diff --git a/splunk_connect_for_snmp_poller/manager/hec_sender.py b/splunk_connect_for_snmp_poller/manager/hec_sender.py index 9f2bfe7..09284ef 100644 --- a/splunk_connect_for_snmp_poller/manager/hec_sender.py +++ b/splunk_connect_for_snmp_poller/manager/hec_sender.py @@ -19,18 +19,20 @@ import requests from celery.utils.log import get_logger +from splunk_connect_for_snmp_poller.manager.static.mib_enricher import MibEnricher + logger = get_logger(__name__) def post_data_to_splunk_hec( - host, - logs_endpoint, - metrics_endpoint, - variables_binds, - is_metric, - index, - one_time_flag=False, - mib_enricher=None + host, + logs_endpoint, + metrics_endpoint, + variables_binds, + is_metric, + index, + one_time_flag=False, + mib_enricher=None ): logger.debug(f"[-] logs : {logs_endpoint}, metrics : {metrics_endpoint}") @@ -55,14 +57,9 @@ def post_event_data(endpoint, host, variables_binds, index, one_time_flag=False, variables_binds = "error: " + str(variables_binds) elif mib_enricher: - variables_binds = json.loads(variables_binds) - metric_result = json.loads(variables_binds["metric"]) - non_metric_result = variables_binds["non_metric"] - mib_enricher.process_one(metric_result) - for field_name in mib_enricher.dimensions_fields: - if field_name in metric_result: - non_metric_result += f"{field_name}=\"{metric_result[field_name]}\" " - variables_binds = non_metric_result + variables_binds = _enrich_event_data(mib_enricher, json.loads(variables_binds)) + elif "non_metric" in variables_binds: + variables_binds = json.loads(variables_binds)["non_metric"] data = { "time": time.time(), @@ -89,18 +86,42 @@ def post_event_data(endpoint, host, variables_binds, index, one_time_flag=False, logger.error(f"Connection error when sending data to HEC index - {index}: {e}") -def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None): +def _enrich_event_data(mib_enricher: MibEnricher, variables_binds: dict) -> str: + """ + This function serves for processing event data the way we add additional dimensions configured in enricher config. + @param mib_enricher: MibEnricher object containing additional dimensions + @param variables_binds: dictionary containing "metric_name", "metric" - metric version of varbinds and + "non_metric" - nonmetric version of varbinds, for ex: + + {'metric': '{"metric_name": "sc4snmp.IF-MIB.ifPhysAddress_1", "_value": "", "metric_type": "OctetString"}', + 'metric_name': 'sc4snmp.IF-MIB.ifPhysAddress_1', + 'non_metric': 'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.6.1="" + value1="" IF-MIB::ifPhysAddress.1="" '} + + We need both formats because process_one function was designed to work on metric data only and non metric format is + difficult to process because of the nature of string type. + + @return: non metric varbind with values from additional dimension added. For ex. for additional dimensions: + [interface_index, interface_desc]: + 'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.6.1="" value1="" IF-MIB::ifPhysAddress.1="" + interface_index="1" interface_desc="lo" ' + """ + metric_result = json.loads(variables_binds["metric"]) + non_metric_result = variables_binds["non_metric"] + mib_enricher.process_one(metric_result) + for field_name in mib_enricher.dimensions_fields: + if field_name in metric_result: + non_metric_result += f"{field_name}=\"{metric_result[field_name]}\" " + return non_metric_result + +def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None): json_val = json.loads(variables_binds) - if mib_enricher: - mib_enricher.process_one(json_val) metric_name = json_val["metric_name"] metric_value = json_val["_value"] fields = {"metric_name:" + metric_name: metric_value} if mib_enricher: - for field_name in mib_enricher.dimensions_fields: - if field_name in json_val: - fields[field_name] = json_val[field_name] + _enrich_metric_data(mib_enricher, json_val, fields) data = { "time": time.time(), @@ -119,3 +140,10 @@ def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None): logger.debug(f"Response is {response.text}") except requests.ConnectionError as e: logger.error(f"Connection error when sending data to HEC index - {index}: {e}") + + +def _enrich_metric_data(mib_enricher: MibEnricher, variables_binds: dict, fields: dict) -> None: + mib_enricher.process_one(variables_binds) + for field_name in mib_enricher.dimensions_fields: + if field_name in variables_binds: + fields[field_name] = variables_binds[field_name] \ No newline at end of file diff --git a/splunk_connect_for_snmp_poller/manager/realtime/interface_mib.py b/splunk_connect_for_snmp_poller/manager/realtime/interface_mib.py index 96b3f14..7553745 100644 --- a/splunk_connect_for_snmp_poller/manager/realtime/interface_mib.py +++ b/splunk_connect_for_snmp_poller/manager/realtime/interface_mib.py @@ -49,6 +49,7 @@ class InterfaceMib: METRIC_VALUE_KEY = "_value" METRIC_TYPE_KEY = "metric_type" NON_METRIC_IDENTIFIER = "ifDescr" + MONGO_IDENTIFIER = "IF-MIB" IF_MIB_METRIC_SUFFIX = "sc4snmp.IF-MIB." IF_MIB_IF_NUMBER = "sc4snmp.IF-MIB.ifNumber_0" IF_MIB_IF_INDEX_BASE = "sc4snmp.IF-MIB.ifIndex_" diff --git a/splunk_connect_for_snmp_poller/manager/static/mib_enricher.py b/splunk_connect_for_snmp_poller/manager/static/mib_enricher.py index 9f6b57b..00ecdac 100644 --- a/splunk_connect_for_snmp_poller/manager/static/mib_enricher.py +++ b/splunk_connect_for_snmp_poller/manager/static/mib_enricher.py @@ -72,6 +72,8 @@ def __init__(self, mib_static_data_collection): def __collect_if_mib_fields(self, mib_static_data_collection): fields = [] + if not mib_static_data_collection: + return [] for el in mib_static_data_collection: fields += list(el.keys()) logger.info(f"_mib_static_data_collection: {mib_static_data_collection}") diff --git a/splunk_connect_for_snmp_poller/manager/task_utilities.py b/splunk_connect_for_snmp_poller/manager/task_utilities.py index c6b1049..0c4f41f 100644 --- a/splunk_connect_for_snmp_poller/manager/task_utilities.py +++ b/splunk_connect_for_snmp_poller/manager/task_utilities.py @@ -218,8 +218,7 @@ def snmp_get_handler( ) ) if not _any_failure_happened(errorIndication, errorStatus, errorIndex, varBinds): - processed_data = mongo_connection.static_data_for(f"{host}:{port}") - mib_enricher, return_multimetric = _enrich_response(processed_data) + mib_enricher, return_multimetric = _enrich_response(mongo_connection, f"{host}:{port}") for varbind in varBinds: result, is_metric = get_translated_string(mib_server_url, [varbind], return_multimetric) post_data_to_splunk_hec( @@ -234,7 +233,8 @@ def snmp_get_handler( ) -def _enrich_response(processed_data): +def _enrich_response(mongo_connection, hostname): + processed_data = mongo_connection.static_data_for(hostname) if processed_data: mib_enricher = MibEnricher(processed_data) return_multimetric = True @@ -303,7 +303,7 @@ def snmp_bulk_handler( # Bulk operation returns array of varbinds for varbind in varBinds: processed_data = mongo_connection.static_data_for(f"{host}:{port}") - mib_enricher, return_multimetric = _enrich_response(processed_data) + mib_enricher, return_multimetric = _enrich_response(processed_data, f"{host}:{port}") logger.debug(f"Bulk returned this varbind: {varbind}") result, is_metric = get_translated_string(mib_server_url, [varbind], return_multimetric) logger.info(result) @@ -382,13 +382,7 @@ def walk_handler( break else: result, is_metric = get_translated_string(mib_server_url, varBinds, True) - if is_metric: - merged_result_metric.append(result) - merged_result.append(eval(result)) - else: - merged_result_non_metric.append(result) - result = eval(result) - merged_result.append(eval(result["metric"])) + _sort_walk_data(is_metric, merged_result_metric, merged_result_non_metric, merged_result, result) processed_result = extract_network_interface_data_from_walk( enricher, merged_result @@ -405,12 +399,39 @@ def walk_handler( logger.info(f"***************** After metric *****************") +def _sort_walk_data(is_metric: bool, merged_result_metric: list, merged_result_non_metric: list, merged_result: list, + varbind): + """ + In WALK operation we can have three scenarios: + 1. mongo db is empty and we want to insert enricher mapping into it + 2. mongo db already has some data and we just need to use it + 3. we don't have any enricher given in config so we're not adding any extra dimensions + Because of different structure of metric/non-metric data we need to divide varbinds on 3 categories. + @param is_metric: is current varbind metric + @param merged_result_metric: list containing metric varbinds + @param merged_result_non_metric: list containing non-metric varbinds + @param merged_result: list containing metric varbinds and metric versions of non-metric varbinds (necessary for 1st + scenario. + @param varbind: current varbind + @return: + """ + if is_metric: + merged_result_metric.append(varbind) + merged_result.append(eval(varbind)) + else: + merged_result_non_metric.append(varbind) + result = eval(varbind) + merged_result.append(eval(result["metric"])) + + def _return_mib_enricher_for_walk(mongo_connection, processed_result, hostname): if processed_result: mongo_connection.update_mib_static_data_for(hostname, processed_result) return MibEnricher(processed_result) else: processed_data = mongo_connection.static_data_for(hostname) + if not processed_data: + return None return MibEnricher(processed_data) diff --git a/splunk_connect_for_snmp_poller/manager/tasks.py b/splunk_connect_for_snmp_poller/manager/tasks.py index 6a9941b..84aec91 100644 --- a/splunk_connect_for_snmp_poller/manager/tasks.py +++ b/splunk_connect_for_snmp_poller/manager/tasks.py @@ -18,9 +18,7 @@ from celery.utils.log import get_task_logger from pysnmp.hlapi import ObjectIdentity, ObjectType, SnmpEngine -from splunk_connect_for_snmp_poller.manager.celery_client import app -from splunk_connect_for_snmp_poller.manager.static.interface_mib_utililities import \ - extract_network_interface_data_from_config + from splunk_connect_for_snmp_poller.manager.task_utilities import ( VarbindCollection, build_authData, diff --git a/splunk_connect_for_snmp_poller/mongo.py b/splunk_connect_for_snmp_poller/mongo.py index e099c00..e4ce41d 100644 --- a/splunk_connect_for_snmp_poller/mongo.py +++ b/splunk_connect_for_snmp_poller/mongo.py @@ -35,6 +35,8 @@ from pymongo import MongoClient, ReturnDocument from pymongo.errors import ConnectionFailure +from splunk_connect_for_snmp_poller.manager.realtime.interface_mib import InterfaceMib + """ In order to store some general data into Mongo we use the following structure. Each WalkedHostsRepository can contain the following fields: @@ -116,7 +118,10 @@ def real_time_data_for(self, host): def static_data_for(self, host): full_collection = self._walked_hosts.find_one({"_id": host}) if WalkedHostsRepository.MIB_STATIC_DATA in full_collection: - return full_collection[WalkedHostsRepository.MIB_STATIC_DATA] + mib_static_data = full_collection[WalkedHostsRepository.MIB_STATIC_DATA] + if InterfaceMib.MONGO_IDENTIFIER in mib_static_data: + return mib_static_data[InterfaceMib.MONGO_IDENTIFIER] + return None else: return None @@ -135,7 +140,7 @@ def update_real_time_data_for(self, host, input_dictionary): def update_mib_static_data_for(self, host, if_mib_data): if if_mib_data: real_time_data_dictionary = { - WalkedHostsRepository.MIB_STATIC_DATA: if_mib_data + WalkedHostsRepository.MIB_STATIC_DATA: {InterfaceMib.MONGO_IDENTIFIER: if_mib_data} } self._walked_hosts.find_one_and_update( {"_id": host},