Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
Feature/dynamic profile (#138)
Browse files Browse the repository at this point in the history
* feat: dynamic profile assignment v1

* feat: dynamic profile assignment v1

* fix: style

* fix: added timeout

* fix: fix

* fix: added unit tests

* fix: cleanup

* fix: cleanup

* fix: cleanup

* fix: cleanup

* fix: PR comments

* fix: pre-commit

* fix: pre-commit

* fix: PR comments 2

* fix: PR comments 2
  • Loading branch information
weliasz committed Oct 11, 2021
1 parent b50480f commit bf2293d
Show file tree
Hide file tree
Showing 14 changed files with 517 additions and 99 deletions.
44 changes: 44 additions & 0 deletions .run/scheduler.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!--
~ Copyright 2021 Splunk Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="scheduler" type="PythonConfigurationType" factoryName="Python">
<module name="splunk-connect-for-snmp-poller" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
<env name="CELERY_BROKER_URL" value="localhost" />
<env name="MONGO_SERVICE_SERVICE_HOST" value="localhost" />
<env name="MONGO_SERVICE_SERVICE_PORT" value="27017" />
<env name="MIBS_SERVER_URL" value="http://localhost:5000" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/splunk_connect_for_snmp_poller" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="$PROJECT_DIR$/splunk_connect_for_snmp_poller/snmp_poller_server.py" />
<option name="PARAMETERS" value="-c ../config.yaml -i ../inventory.csv --event_index em_events --metric_index em_metrics --meta_index em_meta --loglevel debug" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
13 changes: 13 additions & 0 deletions splunk_connect_for_snmp_poller/manager/mib_server_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ async def get_translation(var_binds, mib_server_url, data_format):
"""
@param var_binds: var_binds object getting from SNMP agents
@param mib_server_url: URL of SNMP MIB server
@param data_format: format of data
@return: translated string
"""
payload = await prepare_payload(var_binds)
Expand Down Expand Up @@ -102,3 +103,15 @@ def format_value_for_mib_server(value, value_type):
return value.prettyPrint()
else:
return str(value)


def get_mib_profiles():
mib_server_url = os.environ["MIBS_SERVER_URL"]
endpoint = "profiles"
profiles_url = os.path.join(mib_server_url.strip("/"), endpoint)

try:
return requests.get(profiles_url, timeout=3).text
except Exception:
logger.exception("Error getting profiles from MIB server")
return {}
127 changes: 105 additions & 22 deletions splunk_connect_for_snmp_poller/manager/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@
#
import functools
import logging.config
import threading
import time

import schedule
from pysnmp.hlapi import SnmpEngine

from splunk_connect_for_snmp_poller.manager.data.inventory_record import InventoryRecord
from splunk_connect_for_snmp_poller.manager.poller_utilities import (
automatic_realtime_task,
automatic_realtime_job,
create_poller_scheduler_entry_key,
parse_inventory_file,
return_database_id,
)
from splunk_connect_for_snmp_poller.manager.profile_matching import (
assign_profiles_to_device,
extract_desc,
get_profiles,
)
from splunk_connect_for_snmp_poller.manager.tasks import snmp_polling
from splunk_connect_for_snmp_poller.mongo import WalkedHostsRepository
from splunk_connect_for_snmp_poller.utilities import (
Expand All @@ -38,9 +44,6 @@


class Poller:
# see https://www.alvestrand.no/objectid/1.3.6.1.html for a better understanding
universal_base_oid = "1.3.6.1.*"

def __init__(self, args, server_config):
self._args = args
self._server_config = server_config
Expand All @@ -51,6 +54,8 @@ def __init__(self, args, server_config):
self._server_config["mongo"]
)
self._local_snmp_engine = SnmpEngine()
self._unmatched_devices = {}
self._lock = threading.Lock()

def __get_splunk_indexes(self):
return {
Expand All @@ -59,9 +64,6 @@ def __get_splunk_indexes(self):
"meta_index": self._args.meta_index,
}

def __get_realtime_task_frequency(self):
return self._args.realtime_task_frequency

def run(self):
self.__start_realtime_scheduler_task()
counter = 0
Expand All @@ -88,7 +90,8 @@ def __check_inventory(self):
# update job when either inventory changes or config changes
if server_config_modified or inventory_config_modified:
inventory_hosts = set()
for ir in parse_inventory_file(self._args.inventory):
profiles = get_profiles(self._server_config)
for ir in parse_inventory_file(self._args.inventory, profiles):
entry_key = create_poller_scheduler_entry_key(ir.host, ir.profile)
if entry_key in inventory_hosts:
logger.error(
Expand All @@ -99,17 +102,28 @@ def __check_inventory(self):
ir.profile,
)
continue

inventory_hosts.add(entry_key)
logger.debug(
"[-] server_config['profiles']: %s", self._server_config["profiles"]
)
if entry_key not in self._jobs_map:
self.process_new_job(entry_key, ir)
if ir.profile == "*":
self.delete_all_entries_per_host(ir.host)
self.add_device_for_profile_matching(ir)
else:
self.update_schedule_for_changed_conf(entry_key, ir)
logger.debug(
"[-] server_config['profiles']: %s",
self._server_config["profiles"],
)
if entry_key not in self._jobs_map:
self.process_new_job(entry_key, ir, profiles)
else:
self.update_schedule_for_changed_conf(entry_key, ir, profiles)

self.clean_job_inventory(inventory_hosts)

def delete_all_entries_per_host(self, host):
for entry_key in list(self._jobs_map.keys()):
if entry_key.split("#")[0] == host:
schedule.cancel_job(self._jobs_map.get(entry_key))
del self._jobs_map[entry_key]

def clean_job_inventory(self, inventory_hosts):
for entry_key in list(self._jobs_map):
if entry_key not in inventory_hosts:
Expand All @@ -120,10 +134,12 @@ def clean_job_inventory(self, inventory_hosts):
self._mongo_walked_hosts_coll.delete_host(db_host_id)
del self._jobs_map[entry_key]

def update_schedule_for_changed_conf(self, entry_key, ir):
def update_schedule_for_changed_conf(self, entry_key, ir, profiles):
old_conf = self._jobs_map.get(entry_key).job_func.args
if self.is_conf_changed(entry_key, ir, old_conf):
self.__update_schedule(ir, self._server_config, self.__get_splunk_indexes())
self.__update_schedule(
ir, self._server_config, self.__get_splunk_indexes(), profiles
)

def is_conf_changed(self, entry_key, ir, old_conf):
interval = self._jobs_map.get(entry_key).interval
Expand All @@ -135,17 +151,18 @@ def is_conf_changed(self, entry_key, ir, old_conf):
or frequency != interval
)

def process_new_job(self, entry_key, ir):
def process_new_job(self, entry_key, ir, profiles):
logger.debug("Adding configuration for job %s", entry_key)
job_reference = schedule.every(int(ir.frequency_str)).seconds.do(
scheduled_task,
ir,
self._server_config,
self.__get_splunk_indexes(),
profiles,
)
self._jobs_map[entry_key] = job_reference

def __update_schedule(self, ir, server_config, splunk_indexes):
def __update_schedule(self, ir, server_config, splunk_indexes, profiles):
entry_key = ir.host + "#" + ir.profile

logger.debug("Updating configuration for job %s", entry_key)
Expand All @@ -154,6 +171,7 @@ def __update_schedule(self, ir, server_config, splunk_indexes):
ir,
server_config,
splunk_indexes,
profiles,
)
functools.update_wrapper(new_job_func, scheduled_task)

Expand All @@ -171,15 +189,80 @@ def __start_realtime_scheduler_task(self):
# schedule.every().second.do(
# For debugging purposes better change it to "one second"
schedule.every(self._args.realtime_task_frequency).seconds.do(
automatic_realtime_task,
automatic_realtime_job,
self._mongo_walked_hosts_coll,
self._args.inventory,
self.__get_splunk_indexes(),
self._server_config,
self._local_snmp_engine,
)

schedule.every(self._args.matching_task_frequency).seconds.do(
self.process_unmatched_devices_job,
self._server_config,
)

automatic_realtime_job(
self._mongo_walked_hosts_coll,
self._args.inventory,
self.__get_splunk_indexes(),
self._server_config,
self._local_snmp_engine,
)

def add_device_for_profile_matching(self, device: InventoryRecord):
self._lock.acquire()
self._unmatched_devices[device.host] = device
self._lock.release()

def process_unmatched_devices_job(self, server_config):
job_thread = threading.Thread(
target=self.process_unmatched_devices, args=[server_config]
)
job_thread.start()

def process_unmatched_devices(self, server_config):
if self._unmatched_devices:
try:
profiles = get_profiles(server_config)
self._lock.acquire()
processed_devices = set()
for host, device in self._unmatched_devices.items():
realtime_collection = (
self._mongo_walked_hosts_coll.real_time_data_for(
return_database_id(host)
)
)
if realtime_collection:
descr = extract_desc(realtime_collection)

if descr:
assigned_profiles = assign_profiles_to_device(
profiles["profiles"], descr
)
processed_devices.add(host)

for profile, frequency in assigned_profiles:
entry_key = create_poller_scheduler_entry_key(
host, profile
)
new_record = InventoryRecord(
host,
device.version,
device.community,
profile,
frequency,
)
self.process_new_job(entry_key, new_record, profiles)
for d in processed_devices:
self._unmatched_devices.pop(d)
except Exception as e:
logger.exception(f"Error processing unmatched device {e}")
finally:
if self._lock.locked():
self._lock.release()


def scheduled_task(ir: InventoryRecord, server_config, splunk_indexes):
def scheduled_task(ir: InventoryRecord, server_config, splunk_indexes, profiles):
logger.debug("Executing scheduled_task for %s", ir.__repr__())
snmp_polling.delay(ir.to_json(), server_config, splunk_indexes)
snmp_polling.delay(ir.to_json(), server_config, splunk_indexes, profiles)
Loading

0 comments on commit bf2293d

Please sign in to comment.