Skip to content
This repository has been archived by the owner on Dec 17, 2021. It is now read-only.

fix: Remove unused async #188

Merged
merged 3 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 8 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ celery = "^5.1.2"
pymongo = {version = "^3.11.3", extras = ["srv"]}
jsoncomment = "^0.4.2"
aiohttp = "^3.7.4"
asgiref = "^3.4.1"
backoff = "^1.11.1"

[tool.poetry.dev-dependencies]
Expand Down
45 changes: 16 additions & 29 deletions splunk_connect_for_snmp_poller/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
# limitations under the License.
#
import os
import threading

from asgiref.sync import async_to_sync
from celery import Task
from celery.utils.log import get_task_logger
from pysnmp.hlapi import ObjectIdentity, ObjectType, SnmpEngine

Expand All @@ -39,18 +38,9 @@
)
from splunk_connect_for_snmp_poller.mongo import WalkedHostsRepository

thread_local = threading.local()
logger = get_task_logger(__name__)


def get_shared_snmp_engine():
if not hasattr(thread_local, "local_snmp_engine"):
thread_local.local_snmp_engine = SnmpEngine()
logger.info("Created a single shared instance of an SnmpEngine() object")

return thread_local.local_snmp_engine


async def get_snmp_data(
var_binds,
handler,
Expand Down Expand Up @@ -118,8 +108,14 @@ def sort_varbinds(varbind_list: list) -> VarbindCollection:
return casted_multikey_elements


@app.task(ignore_result=True)
class SNMPTask(Task):
def __init__(self):
self.snmp_engine = SnmpEngine()


@app.task(base=SNMPTask, bind=True, ignore_result=True)
def snmp_polling(
self,
ir_json: str,
server_config,
index,
Expand All @@ -129,24 +125,13 @@ def snmp_polling(
ir = InventoryRecord.from_json(ir_json)
logger.info(f"Got one_time_flag - {one_time_flag} with Ir - {ir.__repr__()}")

async_to_sync(snmp_polling_async)(ir, server_config, index, profiles, one_time_flag)

return f"Executing SNMP Polling for {ir.host} version={ir.version} profile={ir.profile}"


async def snmp_polling_async(
ir: InventoryRecord, server_config, index, profiles, one_time_flag: str
):
hec_sender = HecSender(
os.environ["OTEL_SERVER_METRICS_URL"], os.environ["OTEL_SERVER_LOGS_URL"]
)
mib_server_url = os.environ["MIBS_SERVER_URL"]
host, port = parse_port(ir.host)
logger.debug("Using the following MIBS server URL: %s", mib_server_url)

# create one SnmpEngie for snmp_get_handler, walk_handler, mib_string_handler
snmp_engine = get_shared_snmp_engine()

# create auth_data depending on SNMP's version
auth_data = build_authData(ir.version, ir.community, server_config)
logger.debug("auth_data\n%s", auth_data)
Expand All @@ -159,7 +144,7 @@ async def snmp_polling_async(
additional_metric_fields = server_config.get("additionalMetricField")
enricher_presence = "enricher" in server_config
static_parameters = [
snmp_engine,
self.snmp_engine,
hec_sender,
auth_data,
context_data,
Expand Down Expand Up @@ -191,14 +176,14 @@ async def snmp_polling_async(
varbind_collection = sort_varbinds(var_binds)
logger.debug(f"Varbind collection: {varbind_collection}")
# Perform SNMP BULK
await get_snmp_data(
get_snmp_data(
varbind_collection.bulk,
snmp_bulk_handler,
*get_bulk_specific_parameters,
*static_parameters,
)
# Perform SNMP GET
await get_snmp_data(
get_snmp_data(
varbind_collection.get,
snmp_get_handler,
*get_bulk_specific_parameters,
Expand All @@ -219,7 +204,7 @@ async def snmp_polling_async(
host,
OidConstant.IF_MIB,
)
await walk_handler_with_enricher(
walk_handler_with_enricher(
OidConstant.IF_MIB,
server_config,
mongo_connection,
Expand All @@ -231,16 +216,18 @@ async def snmp_polling_async(
host,
ir.profile,
)
await walk_handler(ir.profile, mongo_connection, *static_parameters)
walk_handler(ir.profile, mongo_connection, *static_parameters)
# Perform SNNP GET for an oid
else:
logger.info("Executing SNMP GET for %s profile=%s", host, ir.profile)
prepared_profile = [ObjectType(ObjectIdentity(ir.profile))]
await snmp_get_handler(
snmp_get_handler(
*get_bulk_specific_parameters, *static_parameters, prepared_profile # type: ignore
)

except Exception:
logger.exception(
f"Error occurred while executing SNMP polling for {host}, version={ir.version}, profile={ir.profile}"
)

return f"Executing SNMP Polling for {ir.host} version={ir.version} profile={ir.profile}"