Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
okuzhel committed Sep 14, 2021
1 parent 1b07dd8 commit 7ef1a09
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 289 deletions.
8 changes: 4 additions & 4 deletions BUILD_STEPS.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ snmpsimd.py --data-dir=data --agent-udpv4-endpoint=0.0.0.0:1161
```docker run -d -p 27017:27017 -v ./data:/data/db mongo```


## Steps to run each components including poller scheduler and worker.
## Steps to run each component including poller scheduler and worker.

Note:
- Ensure you follow the order of iniatialization to avoid connectivity issues!
- This assumes MongoDB and RabbitMQ are configred in config.yaml of respective component.
- Ensure you follow the order of initialization to avoid connectivity issues!
- This assumes MongoDB and RabbitMQ are configured in config.yaml of respective component.

#### Run MIB / Translation Server

Expand Down Expand Up @@ -108,7 +108,7 @@ host,version,community,profile,freqinseconds

#### Inventory Fields

- **host**: is IP adress or FQDN of the SNMP Server
- **host**: is IP address or FQDN of the SNMP Server

- **version**: SNMP server version (i.e. 1, 2c, 3)

Expand Down
280 changes: 164 additions & 116 deletions poetry.lock

Large diffs are not rendered by default.

54 changes: 54 additions & 0 deletions splunk_connect_for_snmp_poller/manager/data/event_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum
from typing import Any, Dict

from celery.utils.log import get_logger

logger = get_logger(__name__)


class EventBuilder:
def __init__(self) -> None:
self.data: Dict = {}

def add(self, field, part: Any) -> None:
self.data[field.value] = part

def is_one_time_walk(self, one_time_flag: bool) -> None:
if one_time_flag:
self.data[EventField.SOURCETYPE.value] = EventType.WALK.value

def build(self) -> dict:
logger.debug("--------data------\n%s", self.data)
return self.data


class EventField(Enum):
TIME = "time"
SOURCETYPE = "sourcetype"
HOST = "host"
INDEX = "index"
EVENT = "event"
FREQUENCY = "frequency"
PROFILE = "profile"
FIELDS = "fields"


class EventType(Enum):
EVENT = "sc4snmp:meta"
METRIC = "metric"
WALK = "sc4snmp:walk"
ERROR = "sc4snmp:error"
28 changes: 28 additions & 0 deletions splunk_connect_for_snmp_poller/manager/data/inventory_record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2021 Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
from dataclasses import dataclass


@dataclass
class InventoryRecord:
host: str
version: str
community: str
profile: str
frequency_str: str

def toJson(self):
return json.dumps(self, default=lambda o: o.__dict__)
75 changes: 41 additions & 34 deletions splunk_connect_for_snmp_poller/manager/hec_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import requests
from celery.utils.log import get_logger

from splunk_connect_for_snmp_poller.manager.data.event_builder import (
EventBuilder,
EventField,
EventType,
)
from splunk_connect_for_snmp_poller.manager.static.mib_enricher import MibEnricher

logger = get_logger(__name__)
Expand Down Expand Up @@ -57,42 +62,48 @@ def post_data_to_splunk_hec(
def post_event_data(
endpoint, host, variables_binds, indexes, one_time_flag=False, mib_enricher=None
):
if "NoSuchInstance" in str(variables_binds):
variables_binds = "error: " + str(variables_binds)
variables_binds = prepare_variable_binds(mib_enricher, variables_binds)

elif mib_enricher:
variables_binds = _enrich_event_data(mib_enricher, json.loads(variables_binds))
elif "non_metric" in variables_binds:
variables_binds = json.loads(variables_binds)["non_metric"]

data = {
"time": time.time(),
"sourcetype": "sc4snmp:meta",
"host": host,
"index": indexes["meta_index"],
"event": str(variables_binds),
}

if one_time_flag:
data["sourcetype"] = "sc4snmp:walk"
builder = init_builder_with_common_data(time.time(), host, indexes["meta_index"])
builder.add(EventField.SOURCETYPE, EventType.EVENT.value)
builder.add(EventField.EVENT, str(variables_binds))
builder.is_one_time_walk(one_time_flag)

if "error" in str(variables_binds):
data["index"] = indexes["event_index"]
data["sourcetype"] = "sc4snmp:error"
builder.add(EventField.INDEX, indexes["event_index"])
builder.add(EventField.SOURCETYPE, EventType.ERROR.value)

logger.debug(f"+++++++++data+++++++++\n{data}")
data = builder.build()

try:
logger.debug(f"+++++++++endpoint+++++++++\n{endpoint}")
logger.debug("+++++++++endpoint+++++++++\n%s", endpoint)
response = requests.post(url=endpoint, json=data, timeout=60)
logger.debug(f"Response code is {response.status_code}")
logger.debug(f"Response is {response.text}")
logger.debug("Response code is %s", response.status_code)
logger.debug("Response is %s", response.text)
except requests.ConnectionError as e:
logger.error(
f"Connection error when sending data to HEC index - {data['index']}: {e}"
)


def init_builder_with_common_data(time, host, index) -> EventBuilder:
builder = EventBuilder()
builder.add(EventField.TIME, time)
builder.add(EventField.HOST, host)
builder.add(EventField.INDEX, index)
return builder


def prepare_variable_binds(mib_enricher, variables_binds):
if "NoSuchInstance" in str(variables_binds):
variables_binds = "error: " + str(variables_binds)
elif mib_enricher:
variables_binds = _enrich_event_data(mib_enricher, json.loads(variables_binds))
elif "non_metric" in variables_binds:
variables_binds = json.loads(variables_binds)["non_metric"]
return variables_binds


def _enrich_event_data(mib_enricher: MibEnricher, variables_binds: dict) -> str:
"""
This function serves for processing event data the way we add additional dimensions configured in enricher config.
Expand Down Expand Up @@ -131,21 +142,17 @@ def post_metric_data(endpoint, host, variables_binds, index, mib_enricher=None):
if mib_enricher:
_enrich_metric_data(mib_enricher, json_val, fields)

data = {
"time": time.time(),
"host": host,
"index": index,
"event": "metric",
"fields": fields,
}
builder = init_builder_with_common_data(time.time(), host, index)
builder.add(EventField.EVENT, EventType.METRIC.value)
builder.add(EventField.FIELDS, fields)

logger.debug(f"--------data------\n{data}")
data = builder.build()

try:
logger.debug(f"-----endpoint------\n{endpoint}")
logger.debug("-----endpoint------\n%s", endpoint)
response = requests.post(url=endpoint, json=data, timeout=60)
logger.debug(f"Response code is {response.status_code}")
logger.debug(f"Response is {response.text}")
logger.debug("Response code is %s", response.status_code)
logger.debug("Response is %s", response.text)
except requests.ConnectionError as e:
logger.error(f"Connection error when sending data to HEC index - {index}: {e}")

Expand Down
126 changes: 55 additions & 71 deletions splunk_connect_for_snmp_poller/manager/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import schedule
from pysnmp.hlapi import SnmpEngine

from splunk_connect_for_snmp_poller.manager.data.inventory_record import InventoryRecord
from splunk_connect_for_snmp_poller.manager.poller_utilities import (
automatic_realtime_task,
create_poller_scheduler_entry_key,
Expand Down Expand Up @@ -89,89 +90,75 @@ def __check_inventory(self):
inventory_hosts = set()
for ir in parse_inventory_file(self._args.inventory):
entry_key = create_poller_scheduler_entry_key(ir.host, ir.profile)
frequency = int(ir.frequency_str)
if entry_key in inventory_hosts:
logger.error(
f"{ir.host},{ir.version},{ir.community},{ir.profile},{ir.frequency_str} has duplicated "
f"hostname {ir.host} and {ir.profile} in the inventory,"
f" cannot use the same profile twice for the same device"
"%s has duplicated hostname %s and %s in the inventory, cannot use the same profile twice for "
"the same device",
ir.__repr__(),
ir.host,
ir.profile,
)
continue

inventory_hosts.add(entry_key)
logger.info(
f"[-] server_config['profiles']: {self._server_config['profiles']}"
"[-] server_config['profiles']: %s", self._server_config["profiles"]
)
if entry_key not in self._jobs_map:
logger.debug(f"Adding configuration for job {entry_key}")
job_reference = schedule.every(frequency).seconds.do(
scheduled_task,
ir.host,
ir.version,
ir.community,
ir.profile,
self._server_config,
self.__get_splunk_indexes(),
)
self._jobs_map[entry_key] = job_reference
self.process_new_job(entry_key, ir)
else:
old_conf = self._jobs_map.get(entry_key).job_func.args
if (
old_conf
!= (
ir.host,
ir.version,
ir.community,
ir.profile,
self._server_config,
self.__get_splunk_indexes(),
)
or frequency != self._jobs_map.get(entry_key).interval
):
self.__update_schedule(
ir.community,
ir.frequency,
ir.host,
ir.profile,
ir.version,
self._server_config,
self.__get_splunk_indexes(),
)
for entry_key in list(self._jobs_map):
if entry_key not in inventory_hosts:
logger.debug(f"Removing job for {entry_key}")
schedule.cancel_job(self._jobs_map.get(entry_key))
db_host_id = return_database_id(entry_key)
logger.debug(f"Removing _id {db_host_id} from mongo database")
self._mongo_walked_hosts_coll.delete_host(db_host_id)
del self._jobs_map[entry_key]

def __update_schedule(
self,
community,
frequency,
host,
profile,
version,
server_config,
splunk_indexes,
):
entry_key = host + "#" + profile

logger.debug(f"Updating configuration for job {entry_key}")
self.update_schedule_for_changed_conf(entry_key, ir)
self.clean_job_inventory(inventory_hosts)

def clean_job_inventory(self, inventory_hosts):
for entry_key in list(self._jobs_map):
if entry_key not in inventory_hosts:
logger.debug("Removing job for %s", entry_key)
schedule.cancel_job(self._jobs_map.get(entry_key))
db_host_id = return_database_id(entry_key)
logger.debug("Removing _id %s from mongo database", db_host_id)
self._mongo_walked_hosts_coll.delete_host(db_host_id)
del self._jobs_map[entry_key]

def update_schedule_for_changed_conf(self, entry_key, ir):
old_conf = self._jobs_map.get(entry_key).job_func.args
if self.is_conf_changed(entry_key, ir, old_conf):
self.__update_schedule(ir, self._server_config, self.__get_splunk_indexes())

def is_conf_changed(self, entry_key, ir, old_conf):
interval = self._jobs_map.get(entry_key).interval
config = self._server_config
indexes = self.__get_splunk_indexes()
frequency = int(ir.frequency_str)
return (
old_conf != (ir.host, ir.version, ir.community, ir.profile, config, indexes)
or frequency != interval
)

def process_new_job(self, entry_key, ir):
logger.debug("Adding configuration for job %s", entry_key)
job_reference = schedule.every(int(ir.frequency_str)).seconds.do(
scheduled_task,
ir,
self._server_config,
self.__get_splunk_indexes(),
)
self._jobs_map[entry_key] = job_reference

def __update_schedule(self, ir, server_config, splunk_indexes):
entry_key = ir.host + "#" + ir.profile

logger.debug("Updating configuration for job %s", entry_key)
new_job_func = functools.partial(
scheduled_task,
host,
version,
community,
profile,
ir,
server_config,
splunk_indexes,
)
functools.update_wrapper(new_job_func, scheduled_task)

self._jobs_map.get(entry_key).job_func = new_job_func
self._jobs_map.get(entry_key).interval = frequency
self._jobs_map.get(entry_key).interval = int(ir.frequency_str)
old_next_run = self._jobs_map.get(entry_key).next_run
self._jobs_map.get(entry_key)._schedule_next_run()
new_next_run = self._jobs_map.get(entry_key).next_run
Expand All @@ -193,9 +180,6 @@ def __start_realtime_scheduler_task(self):
)


def scheduled_task(host, version, community, profile, server_config, splunk_indexes):
logger.debug(
f"Executing scheduled_task for {host} version={version} community={community} profile={profile}"
)

snmp_polling.delay(host, version, community, profile, server_config, splunk_indexes)
def scheduled_task(ir: InventoryRecord, server_config, splunk_indexes):
logger.debug("Executing scheduled_task for %s", ir.__repr__())
snmp_polling.delay(ir.toJson(), server_config, splunk_indexes)
Loading

0 comments on commit 7ef1a09

Please sign in to comment.