Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
feat: add integration for bulk and get operations
Browse files Browse the repository at this point in the history
  • Loading branch information
omrozowicz-splunk committed Aug 17, 2021
1 parent c6c54eb commit d4f494d
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 38 deletions.
72 changes: 50 additions & 22 deletions splunk_connect_for_snmp_poller/manager/hec_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
import requests
from celery.utils.log import get_logger

from splunk_connect_for_snmp_poller.manager.static.mib_enricher import MibEnricher

logger = get_logger(__name__)


def post_data_to_splunk_hec(
host,
logs_endpoint,
metrics_endpoint,
variables_binds,
is_metric,
index,
one_time_flag=False,
mib_enricher=None
host,
logs_endpoint,
metrics_endpoint,
variables_binds,
is_metric,
index,
one_time_flag=False,
mib_enricher=None
):
logger.debug(f"[-] logs : {logs_endpoint}, metrics : {metrics_endpoint}")

Expand All @@ -55,14 +57,9 @@ def post_event_data(endpoint, host, variables_binds, index, one_time_flag=False,
variables_binds = "error: " + str(variables_binds)

elif mib_enricher:
variables_binds = json.loads(variables_binds)
metric_result = json.loads(variables_binds["metric"])
non_metric_result = variables_binds["non_metric"]
mib_enricher.process_one(metric_result)
for field_name in mib_enricher.dimensions_fields:
if field_name in metric_result:
non_metric_result += f"{field_name}=\"{metric_result[field_name]}\" "
variables_binds = non_metric_result
variables_binds = _enrich_event_data(mib_enricher, json.loads(variables_binds))
elif "non_metric" in variables_binds:
variables_binds = json.loads(variables_binds)["non_metric"]

data = {
"time": time.time(),
Expand All @@ -89,18 +86,42 @@ def post_event_data(endpoint, host, variables_binds, index, one_time_flag=False,
logger.error(f"Connection error when sending data to HEC index - {index}: {e}")


def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None):
def _enrich_event_data(mib_enricher: MibEnricher, variables_binds: dict) -> str:
"""
This function serves for processing event data the way we add additional dimensions configured in enricher config.
@param mib_enricher: MibEnricher object containing additional dimensions
@param variables_binds: dictionary containing "metric_name", "metric" - metric version of varbinds and
"non_metric" - nonmetric version of varbinds, for ex:
{'metric': '{"metric_name": "sc4snmp.IF-MIB.ifPhysAddress_1", "_value": "", "metric_type": "OctetString"}',
'metric_name': 'sc4snmp.IF-MIB.ifPhysAddress_1',
'non_metric': 'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.6.1=""
value1="" IF-MIB::ifPhysAddress.1="" '}
We need both formats because process_one function was designed to work on metric data only and non metric format is
difficult to process because of the nature of string type.
@return: non metric varbind with values from additional dimension added. For ex. for additional dimensions:
[interface_index, interface_desc]:
'oid-type1="ObjectIdentity" value1-type="OctetString" 1.3.6.1.2.1.2.2.1.6.1="" value1="" IF-MIB::ifPhysAddress.1=""
interface_index="1" interface_desc="lo" '
"""
metric_result = json.loads(variables_binds["metric"])
non_metric_result = variables_binds["non_metric"]
mib_enricher.process_one(metric_result)
for field_name in mib_enricher.dimensions_fields:
if field_name in metric_result:
non_metric_result += f"{field_name}=\"{metric_result[field_name]}\" "
return non_metric_result


def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None):
json_val = json.loads(variables_binds)
if mib_enricher:
mib_enricher.process_one(json_val)
metric_name = json_val["metric_name"]
metric_value = json_val["_value"]
fields = {"metric_name:" + metric_name: metric_value}
if mib_enricher:
for field_name in mib_enricher.dimensions_fields:
if field_name in json_val:
fields[field_name] = json_val[field_name]
_enrich_metric_data(mib_enricher, json_val, fields)

data = {
"time": time.time(),
Expand All @@ -119,3 +140,10 @@ def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None):
logger.debug(f"Response is {response.text}")
except requests.ConnectionError as e:
logger.error(f"Connection error when sending data to HEC index - {index}: {e}")


def _enrich_metric_data(mib_enricher: MibEnricher, variables_binds: dict, fields: dict) -> None:
mib_enricher.process_one(variables_binds)
for field_name in mib_enricher.dimensions_fields:
if field_name in variables_binds:
fields[field_name] = variables_binds[field_name]
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class InterfaceMib:
METRIC_VALUE_KEY = "_value"
METRIC_TYPE_KEY = "metric_type"
NON_METRIC_IDENTIFIER = "ifDescr"
MONGO_IDENTIFIER = "IF-MIB"
IF_MIB_METRIC_SUFFIX = "sc4snmp.IF-MIB."
IF_MIB_IF_NUMBER = "sc4snmp.IF-MIB.ifNumber_0"
IF_MIB_IF_INDEX_BASE = "sc4snmp.IF-MIB.ifIndex_"
Expand Down
2 changes: 2 additions & 0 deletions splunk_connect_for_snmp_poller/manager/static/mib_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def __init__(self, mib_static_data_collection):

def __collect_if_mib_fields(self, mib_static_data_collection):
fields = []
if not mib_static_data_collection:
return []
for el in mib_static_data_collection:
fields += list(el.keys())
logger.info(f"_mib_static_data_collection: {mib_static_data_collection}")
Expand Down
43 changes: 32 additions & 11 deletions splunk_connect_for_snmp_poller/manager/task_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,7 @@ def snmp_get_handler(
)
)
if not _any_failure_happened(errorIndication, errorStatus, errorIndex, varBinds):
processed_data = mongo_connection.static_data_for(f"{host}:{port}")
mib_enricher, return_multimetric = _enrich_response(processed_data)
mib_enricher, return_multimetric = _enrich_response(mongo_connection, f"{host}:{port}")
for varbind in varBinds:
result, is_metric = get_translated_string(mib_server_url, [varbind], return_multimetric)
post_data_to_splunk_hec(
Expand All @@ -234,7 +233,8 @@ def snmp_get_handler(
)


def _enrich_response(processed_data):
def _enrich_response(mongo_connection, hostname):
processed_data = mongo_connection.static_data_for(hostname)
if processed_data:
mib_enricher = MibEnricher(processed_data)
return_multimetric = True
Expand Down Expand Up @@ -303,7 +303,7 @@ def snmp_bulk_handler(
# Bulk operation returns array of varbinds
for varbind in varBinds:
processed_data = mongo_connection.static_data_for(f"{host}:{port}")
mib_enricher, return_multimetric = _enrich_response(processed_data)
mib_enricher, return_multimetric = _enrich_response(processed_data, f"{host}:{port}")
logger.debug(f"Bulk returned this varbind: {varbind}")
result, is_metric = get_translated_string(mib_server_url, [varbind], return_multimetric)
logger.info(result)
Expand Down Expand Up @@ -382,13 +382,7 @@ def walk_handler(
break
else:
result, is_metric = get_translated_string(mib_server_url, varBinds, True)
if is_metric:
merged_result_metric.append(result)
merged_result.append(eval(result))
else:
merged_result_non_metric.append(result)
result = eval(result)
merged_result.append(eval(result["metric"]))
_sort_walk_data(is_metric, merged_result_metric, merged_result_non_metric, merged_result, result)

processed_result = extract_network_interface_data_from_walk(
enricher, merged_result
Expand All @@ -405,12 +399,39 @@ def walk_handler(
logger.info(f"***************** After metric *****************")


def _sort_walk_data(is_metric: bool, merged_result_metric: list, merged_result_non_metric: list, merged_result: list,
varbind):
"""
In WALK operation we can have three scenarios:
1. mongo db is empty and we want to insert enricher mapping into it
2. mongo db already has some data and we just need to use it
3. we don't have any enricher given in config so we're not adding any extra dimensions
Because of different structure of metric/non-metric data we need to divide varbinds on 3 categories.
@param is_metric: is current varbind metric
@param merged_result_metric: list containing metric varbinds
@param merged_result_non_metric: list containing non-metric varbinds
@param merged_result: list containing metric varbinds and metric versions of non-metric varbinds (necessary for 1st
scenario.
@param varbind: current varbind
@return:
"""
if is_metric:
merged_result_metric.append(varbind)
merged_result.append(eval(varbind))
else:
merged_result_non_metric.append(varbind)
result = eval(varbind)
merged_result.append(eval(result["metric"]))


def _return_mib_enricher_for_walk(mongo_connection, processed_result, hostname):
if processed_result:
mongo_connection.update_mib_static_data_for(hostname, processed_result)
return MibEnricher(processed_result)
else:
processed_data = mongo_connection.static_data_for(hostname)
if not processed_data:
return None
return MibEnricher(processed_data)


Expand Down
4 changes: 1 addition & 3 deletions splunk_connect_for_snmp_poller/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

from celery.utils.log import get_task_logger
from pysnmp.hlapi import ObjectIdentity, ObjectType, SnmpEngine
from splunk_connect_for_snmp_poller.manager.celery_client import app
from splunk_connect_for_snmp_poller.manager.static.interface_mib_utililities import \
extract_network_interface_data_from_config

from splunk_connect_for_snmp_poller.manager.task_utilities import (
VarbindCollection,
build_authData,
Expand Down
9 changes: 7 additions & 2 deletions splunk_connect_for_snmp_poller/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from pymongo import MongoClient, ReturnDocument
from pymongo.errors import ConnectionFailure

from splunk_connect_for_snmp_poller.manager.realtime.interface_mib import InterfaceMib

"""
In order to store some general data into Mongo we use the following structure.
Each WalkedHostsRepository can contain the following fields:
Expand Down Expand Up @@ -116,7 +118,10 @@ def real_time_data_for(self, host):
def static_data_for(self, host):
full_collection = self._walked_hosts.find_one({"_id": host})
if WalkedHostsRepository.MIB_STATIC_DATA in full_collection:
return full_collection[WalkedHostsRepository.MIB_STATIC_DATA]
mib_static_data = full_collection[WalkedHostsRepository.MIB_STATIC_DATA]
if InterfaceMib.MONGO_IDENTIFIER in mib_static_data:
return mib_static_data[InterfaceMib.MONGO_IDENTIFIER]
return None
else:
return None

Expand All @@ -135,7 +140,7 @@ def update_real_time_data_for(self, host, input_dictionary):
def update_mib_static_data_for(self, host, if_mib_data):
if if_mib_data:
real_time_data_dictionary = {
WalkedHostsRepository.MIB_STATIC_DATA: if_mib_data
WalkedHostsRepository.MIB_STATIC_DATA: {InterfaceMib.MONGO_IDENTIFIER: if_mib_data}
}
self._walked_hosts.find_one_and_update(
{"_id": host},
Expand Down

0 comments on commit d4f494d

Please sign in to comment.