diff --git a/splunk_connect_for_snmp_poller/manager/tasks.py b/splunk_connect_for_snmp_poller/manager/tasks.py index 340a534..95b024e 100644 --- a/splunk_connect_for_snmp_poller/manager/tasks.py +++ b/splunk_connect_for_snmp_poller/manager/tasks.py @@ -15,7 +15,6 @@ # import os -from asgiref.sync import async_to_sync from celery import Task from celery.utils.log import get_task_logger from pysnmp.hlapi import ObjectIdentity, ObjectType, SnmpEngine @@ -108,14 +107,15 @@ def sort_varbinds(varbind_list: list) -> VarbindCollection: casted_multikey_elements += VarbindCollection(get=get_list, bulk=bulk_list) return casted_multikey_elements + class SNMPTask(Task): - def __init__(self): self.snmp_engine = SnmpEngine() -@app.task(bind=True,ignore_result=True) -def snmp_polling(self, +@app.task(base=SNMPTask, bind=True, ignore_result=True) +def snmp_polling( + self, ir_json: str, server_config, index, @@ -125,14 +125,6 @@ def snmp_polling(self, ir = InventoryRecord.from_json(ir_json) logger.info(f"Got one_time_flag - {one_time_flag} with Ir - {ir.__repr__()}") - async_to_sync(snmp_polling_async)(ir, server_config, index, profiles, one_time_flag) - - return f"Executing SNMP Polling for {ir.host} version={ir.version} profile={ir.profile}" - - -async def snmp_polling_async( - ir: InventoryRecord, server_config, index, profiles, one_time_flag: str -): hec_sender = HecSender( os.environ["OTEL_SERVER_METRICS_URL"], os.environ["OTEL_SERVER_LOGS_URL"] ) @@ -184,14 +176,14 @@ async def snmp_polling_async( varbind_collection = sort_varbinds(var_binds) logger.debug(f"Varbind collection: {varbind_collection}") # Perform SNMP BULK - await get_snmp_data( + get_snmp_data( varbind_collection.bulk, snmp_bulk_handler, *get_bulk_specific_parameters, *static_parameters, ) # Perform SNMP GET - await get_snmp_data( + get_snmp_data( varbind_collection.get, snmp_get_handler, *get_bulk_specific_parameters, @@ -212,7 +204,7 @@ async def snmp_polling_async( host, OidConstant.IF_MIB, ) - await walk_handler_with_enricher( + walk_handler_with_enricher( OidConstant.IF_MIB, server_config, mongo_connection, @@ -224,12 +216,12 @@ async def snmp_polling_async( host, ir.profile, ) - await walk_handler(ir.profile, mongo_connection, *static_parameters) + walk_handler(ir.profile, mongo_connection, *static_parameters) # Perform SNNP GET for an oid else: logger.info("Executing SNMP GET for %s profile=%s", host, ir.profile) prepared_profile = [ObjectType(ObjectIdentity(ir.profile))] - await snmp_get_handler( + snmp_get_handler( *get_bulk_specific_parameters, *static_parameters, prepared_profile # type: ignore ) @@ -237,3 +229,5 @@ async def snmp_polling_async( logger.exception( f"Error occurred while executing SNMP polling for {host}, version={ir.version}, profile={ir.profile}" ) + + return f"Executing SNMP Polling for {ir.host} version={ir.version} profile={ir.profile}"