Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
fix: implement ttl for enricher (#195)
Browse files Browse the repository at this point in the history
* fix: implement ttl feature for enricher

* fix: update unit tests

* fix: refactor mib_string_handler

* fix: refactor and update unit tests

* fix: provide workaround for old enricher logic to work

* fix: update enricher logic

* fix: delete tests of removed functions

* fix: fix test_hec_sender tests

* fix: fix test_static_config_data tests

* fix: fix test config

* fix: change position of celery import

* fix: delete comment

* fix: add check if we already have job in decit
  • Loading branch information
omrozowicz-splunk committed Nov 10, 2021
1 parent 9166792 commit 25e6029
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 286 deletions.
48 changes: 47 additions & 1 deletion splunk_connect_for_snmp_poller/manager/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,34 @@
from splunk_connect_for_snmp_poller.manager.poller_utilities import (
automatic_onetime_task,
automatic_realtime_job,
create_poller_enricher_entry_key,
create_poller_scheduler_entry_key,
parse_inventory_file,
return_database_id,
update_enricher_config,
update_inventory_record,
)
from splunk_connect_for_snmp_poller.manager.profile_matching import (
assign_profiles_to_device,
extract_desc,
get_profiles,
)
from splunk_connect_for_snmp_poller.manager.task_utilities import translate_list_to_oid
from splunk_connect_for_snmp_poller.manager.tasks import snmp_polling
from splunk_connect_for_snmp_poller.manager.validator.inventory_validator import (
DYNAMIC_PROFILE,
)
from splunk_connect_for_snmp_poller.manager.variables import onetime_if_walk
from splunk_connect_for_snmp_poller.manager.variables import (
enricher_existing_varbinds,
enricher_if_mib,
enricher_oid_family,
onetime_if_walk,
)
from splunk_connect_for_snmp_poller.mongo import WalkedHostsRepository
from splunk_connect_for_snmp_poller.utilities import (
OnetimeFlag,
file_was_modified,
multi_key_lookup,
parse_config_file,
)

Expand All @@ -57,6 +67,7 @@ def __init__(self, args, server_config):
self._inventory_mod_time = 0
self._config_mod_time = 0
self._jobs_map = {}
self._enricher_jobs_map = {}
self._dynamic_jobs = set()
self._mongo = WalkedHostsRepository(self._server_config["mongo"])
self._local_snmp_engine = SnmpEngine()
Expand Down Expand Up @@ -173,6 +184,7 @@ def __add_enricher_to_a_host(self, current_enricher, ir, new_host=False):
logger.info("Add enricher to a host")
old_enricher = {} if new_host else self._old_enricher
if current_enricher != {}:
self.process_jobs_for_enricher(current_enricher, ir)
update_enricher_config(
old_enricher,
current_enricher,
Expand All @@ -190,6 +202,13 @@ def delete_all_dynamic_entries_per_host(self, host):
del self._jobs_map[entry_key]
self._dynamic_jobs.remove(entry_key)

def delete_all_enricher_entries_per_host(self, host):
for entry_key in list(self._enricher_jobs_map.keys()):
if entry_key.split("#")[0] == host:
logger.debug("Removing job for %s", entry_key)
schedule.cancel_job(self._enricher_jobs_map.get(entry_key))
del self._enricher_jobs_map[entry_key]

def clean_job_inventory(self, inventory_entry_keys: set, inventory_hosts: set):
for entry_key in list(self._jobs_map):
if entry_key not in inventory_entry_keys:
Expand All @@ -203,6 +222,7 @@ def clean_job_inventory(self, inventory_entry_keys: set, inventory_hosts: set):
str(inventory_hosts),
)
self._mongo.delete_host(db_host_id)
self.delete_all_enricher_entries_per_host(db_host_id)
self._mongo.delete_onetime_walk_result(db_host_id)
del self._jobs_map[entry_key]

Expand All @@ -223,6 +243,32 @@ def is_conf_changed(self, entry_key, ir, old_conf):
or frequency != interval
)

def process_jobs_for_enricher(self, enricher, ir):
ifmib_structure = multi_key_lookup(
enricher, (enricher_oid_family, enricher_if_mib, enricher_existing_varbinds)
)
for varbind in ifmib_structure:
ifmib_attr, ttl, = (
varbind["id"],
varbind["ttl"],
)
ifmib_oid = translate_list_to_oid(["IF-MIB", ifmib_attr])
db_host = return_database_id(ir.host)
entry_key = create_poller_enricher_entry_key(db_host, ifmib_attr)
if entry_key in self._enricher_jobs_map:
return
logger.debug("Adding configuration for enricher job %s", entry_key)
new_ir = update_inventory_record(ir, ifmib_oid, ttl)
job_reference = schedule.every(int(ttl)).seconds.do(
snmp_polling,
new_ir.to_json(),
self._server_config,
self.__get_splunk_indexes(),
None,
OnetimeFlag.ENRICHER_UPDATE_WALK.value,
)
self._enricher_jobs_map[entry_key] = job_reference

def process_new_job(self, entry_key, ir, profiles):
acquired_profiles = profiles.get("profiles")
if acquired_profiles is not None and ir.profile not in acquired_profiles:
Expand Down
12 changes: 12 additions & 0 deletions splunk_connect_for_snmp_poller/manager/poller_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import copy
import csv
import logging.config
import threading
Expand Down Expand Up @@ -379,8 +380,19 @@ def create_poller_scheduler_entry_key(host, profile):
return host + "#" + profile


def create_poller_enricher_entry_key(host, ifmib_attribute):
return host + "#" + ifmib_attribute


def return_database_id(host):
if "#" in host:
host = host.split("#")[0]
_host, _port = parse_port(host)
return f"{_host}:{_port}"


def update_inventory_record(original_ir, oid, ttl):
ir = copy.deepcopy(original_ir)
ir.profile = f"{oid}.*"
ir.frequency_str = ttl
return ir
70 changes: 28 additions & 42 deletions splunk_connect_for_snmp_poller/manager/realtime/interface_mib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from operator import itemgetter


# See http://www.net-snmp.org/docs/mibs/interfaces.html for additional implementation details
Expand All @@ -24,6 +25,7 @@ def extract_if_mib_only(translated_walk_result):
InterfaceMib.METRIC_NAME_KEY,
InterfaceMib.METRIC_VALUE_KEY,
InterfaceMib.METRIC_TYPE_KEY,
InterfaceMib.METRIC_PARSED_INDEX_KEY,
)
)
and translation[InterfaceMib.METRIC_NAME_KEY].startswith(
Expand All @@ -37,67 +39,51 @@ class InterfaceMib:
METRIC_NAME_KEY = "metric_name"
METRIC_VALUE_KEY = "_value"
METRIC_TYPE_KEY = "metric_type"
METRIC_PARSED_INDEX_KEY = "parsed_index"
METRIC_IF_INDEX_KEY = "ifIndex"
IF_MIB_METRIC_PREFIX = "sc4snmp.IF-MIB."
NON_METRIC_IDENTIFIER = "ifDescr"
IF_MIB_DATA_MONGO_IDENTIFIER = "IF-MIB"
IF_MIB_IF_NUMBER = "sc4snmp.IF-MIB.ifNumber_0"
IF_MIB_IF_INDEX_BASE = "sc4snmp.IF-MIB.ifIndex_"
IF_MIB_IF_DESCR_BASE = "sc4snmp.IF-MIB.ifDescr_"

def __init__(self, if_mib_metric_walk_data):
self._if_mib_walk_data = extract_if_mib_only(if_mib_metric_walk_data)
self._full_dictionary = self.__build_in_memory_dictionary()
self._network_interfaces = self.__extract_number_of_network_interfaces()
self._network_indexes = self.__extract_interface_indexes()
self._network_interface_names = self.__extract_interface_names()

def unprocessed_if_mib_data(self):
return self._if_mib_walk_data

def network_interfaces(self):
return self._network_interfaces

def network_indexes(self):
return self._network_indexes

def network_interface_names(self):
return self._network_interface_names

def has_consistent_data(self):
return self.network_interfaces() == len(self.network_indexes()) and len(
self.network_indexes()
) == len(self.network_interface_names())

def __build_in_memory_dictionary(self):
all_keys = {}
for mib in self.unprocessed_if_mib_data():
all_keys[mib[InterfaceMib.METRIC_NAME_KEY]] = {
InterfaceMib.METRIC_VALUE_KEY: mib[InterfaceMib.METRIC_VALUE_KEY],
InterfaceMib.METRIC_TYPE_KEY: mib[InterfaceMib.METRIC_TYPE_KEY],
}
if mib[InterfaceMib.METRIC_NAME_KEY] not in all_keys:
all_keys[mib[InterfaceMib.METRIC_NAME_KEY]] = []
all_keys[mib[InterfaceMib.METRIC_NAME_KEY]].append(
{
InterfaceMib.METRIC_VALUE_KEY: mib[InterfaceMib.METRIC_VALUE_KEY],
InterfaceMib.METRIC_TYPE_KEY: mib[InterfaceMib.METRIC_TYPE_KEY],
InterfaceMib.METRIC_PARSED_INDEX_KEY: self.__get_ifindex(
mib[InterfaceMib.METRIC_PARSED_INDEX_KEY]
),
}
)
sorted(
all_keys[mib[InterfaceMib.METRIC_NAME_KEY]],
key=itemgetter(InterfaceMib.METRIC_PARSED_INDEX_KEY),
)
return all_keys

def __extract_number_of_network_interfaces(self):
if InterfaceMib.IF_MIB_IF_NUMBER in self._full_dictionary:
if_number = self._full_dictionary[InterfaceMib.IF_MIB_IF_NUMBER]
return int(if_number[InterfaceMib.METRIC_VALUE_KEY])
return 0
def __get_ifindex(self, ifindex_dictionary):
return ifindex_dictionary[InterfaceMib.METRIC_IF_INDEX_KEY]

def _return_number_of_interfaces(self):
for value_list in self._full_dictionary.values():
return len(value_list)

def __extract_single_field_as_list(self, base_mib_metric_name):
all_indexes = []
for index in range(0, self.network_interfaces()):
current = base_mib_metric_name + str(index + 1)
if current in self._full_dictionary:
all_indexes.append(
self._full_dictionary[current][InterfaceMib.METRIC_VALUE_KEY]
)
if base_mib_metric_name in self._full_dictionary:
for el in self._full_dictionary[base_mib_metric_name]:
all_indexes.append(el[InterfaceMib.METRIC_VALUE_KEY])
return all_indexes

def __extract_interface_indexes(self):
return self.__extract_single_field_as_list(InterfaceMib.IF_MIB_IF_INDEX_BASE)

def __extract_interface_names(self):
return self.__extract_single_field_as_list(InterfaceMib.IF_MIB_IF_DESCR_BASE)

def extract_custom_field(self, snmp_field_name):
return self.__extract_single_field_as_list(snmp_field_name)
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ class OidConstant:
# see https://www.alvestrand.no/objectid/1.3.6.1.html for a better understanding
UNIVERSAL_BASE_OID = "1.3.6.1.*"
IF_MIB = "1.3.6.1.2.1.2.*"
IF_MIB_PREFIX = "1.3.6.1.2.1.2"
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,26 @@ def extract_network_interface_data_from_existing_config(config_as_dict):
result = []
if splunk_dimensions:
for splunk_dimension in splunk_dimensions:
for key in splunk_dimension.keys():
result.append(
{
"oid_name": f"{InterfaceMib.IF_MIB_METRIC_PREFIX}{key}_",
"splunk_dimension_name": splunk_dimension[key],
}
)
result.append(
{
"oid_name": f"{InterfaceMib.IF_MIB_METRIC_PREFIX}{splunk_dimension['id']}",
"splunk_dimension_name": splunk_dimension["name"],
}
)
logger.debug(f"IF-MIB additional attributes for Splunk: {result}")
return result


def extract_network_interface_data_from_walk(config_as_dict, if_mib_metric_walk_data):
result = []
network_data = InterfaceMib(if_mib_metric_walk_data)
if network_data.has_consistent_data():
enricher_fields = extract_network_interface_data_from_existing_config(
config_as_dict
)
for data in enricher_fields:
splunk_dimension = data["splunk_dimension_name"]
current_result = network_data.extract_custom_field(data["oid_name"])
if current_result:
result.append({f"{splunk_dimension}": current_result})
enricher_fields = extract_network_interface_data_from_existing_config(
config_as_dict
)
for data in enricher_fields:
splunk_dimension = data["splunk_dimension_name"]
current_result = network_data.extract_custom_field(data["oid_name"])
if current_result:
result.append({f"{splunk_dimension}": current_result})

return result
18 changes: 9 additions & 9 deletions splunk_connect_for_snmp_poller/manager/static/mib_enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,13 @@ def extract_current_index_from_metric(parsed_index):
return None


def extract_dimension_name_and_value(dimension, index):
all_keys = dimension.keys()
if len(all_keys) == 1 and index is not None:
dimension_name = [key for key in all_keys][0]
dimension_values = dimension[dimension_name]
# We need to enrich only table data. Static values like IF-MIB::ifNumber.0 won't be enriched (it doesn't
# make sense for those)
def extract_dimension_name_and_value(dimension_key, dimension_dict, index):
dimension_values = dimension_dict[dimension_key]
# We need to enrich only table data. Static values like IF-MIB::ifNumber.0 won't be enriched (it doesn't
# make sense for those)
if index is not None:
if 0 <= index < len(dimension_values):
return dimension_name, dimension_values[index]
return dimension_key, dimension_values[index]
return None, None


Expand Down Expand Up @@ -67,7 +65,9 @@ def __enrich_if_mib_existing(self, metric_name, parsed_index):
(
dimension_name,
dimension_value,
) = extract_dimension_name_and_value(dimension, index)
) = extract_dimension_name_and_value(
dimension, if_mib_record, index
)
if dimension_name:
result.append({dimension_name: dimension_value})
return result
Expand Down
32 changes: 14 additions & 18 deletions splunk_connect_for_snmp_poller/manager/task_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,28 +182,15 @@ def mib_string_handler(mib_list: list) -> VarbindCollection:
if not mib_list:
return VarbindCollection(get=[], bulk=[])
get_list, bulk_list = [], []
mibBuilder = builder.MibBuilder()
mibViewController = view.MibViewController(mibBuilder)
config = {"sources": [os.environ["MIBS_FILES_URL"]]}
compiler.addMibCompiler(mibBuilder, **config)
for mib_string in mib_list:
try:
if len(mib_string) == 3:
# convert mib string to oid
oid = ObjectIdentity(
mib_string[0], mib_string[1], mib_string[2]
).resolveWithMib(mibViewController)
logger.debug(f"[-] oid: {oid}")
oid = translate_list_to_oid(mib_string)
logger.debug(f"[-] oid: {oid}")
mib_string_length = len(mib_string)
if mib_string_length == 3:
get_list.append(ObjectType(oid))

elif len(mib_string) == 2:
# convert mib string to oid
oid = ObjectIdentity(mib_string[0], mib_string[1]).resolveWithMib(
mibViewController
)
logger.debug(f"[-] oid: {oid}")
elif mib_string_length < 3:
bulk_list.append(ObjectType(oid))

else:
raise Exception(
f"Invalid mib string - {mib_string}."
Expand All @@ -217,6 +204,15 @@ def mib_string_handler(mib_list: list) -> VarbindCollection:
return VarbindCollection(get=get_list, bulk=bulk_list)


def translate_list_to_oid(mib_string):
mibBuilder = builder.MibBuilder()
mibViewController = view.MibViewController(mibBuilder)
config = {"sources": [os.environ["MIBS_FILES_URL"]]}
compiler.addMibCompiler(mibBuilder, **config)
oid = ObjectIdentity(*mib_string).resolveWithMib(mibViewController)
return oid


def snmp_get_handler(
mongo_connection,
enricher_presence,
Expand Down
Loading

0 comments on commit 25e6029

Please sign in to comment.