Skip to content

Commit

Permalink
fix(agw): Fixing monitord too many opened files error (#8810) (#8812)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Rodriguez <ardzoht@gmail.com>
  • Loading branch information
ardzoht committed Aug 27, 2021
1 parent 935a89a commit a4ad8b0
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 23 deletions.
12 changes: 7 additions & 5 deletions lte/gateway/python/magma/monitord/cpe_monitoring.py
Expand Up @@ -46,7 +46,6 @@ def _get_addr_from_subscribers(sub_ip) -> str:

class CpeMonitoringModule:
def __init__(self):
# TODO: Save to redis
self._subscriber_state = defaultdict(ICMPMonitoringResponse)
self.ping_addresses = []
self.ping_targets = {}
Expand All @@ -66,6 +65,8 @@ async def get_ping_targets(self, service_loop) -> PingedTargets:
Returns: List of [Subscriber ID => IP address, APN] entries
"""

ping_addresses = self.ping_addresses.copy()
ping_targets = self.ping_targets.copy()
try:
mobilityd_chan = ServiceRegistry.get_rpc_channel('mobilityd',
ServiceRegistry.LOCAL)
Expand All @@ -75,12 +76,13 @@ async def get_ping_targets(self, service_loop) -> PingedTargets:
10), service_loop)
for sub in response.entries:
ip = _get_addr_from_subscribers(sub.ip)
self.ping_addresses.append(ip)
self.ping_targets[sub.sid.id] = ip
ping_addresses.append(ip)
ping_targets[sub.sid.id] = ip
except grpc.RpcError as err:
logging.error(
"GetSubscribers Error for %s! %s", err.code(), err.details())
return PingedTargets(self.ping_targets, self.ping_addresses)
"GetSubscribers Error for %s! %s", err.code(), err.details(),
)
return PingedTargets(ping_targets, ping_addresses)

def save_ping_response(self, sid: str, ip_addr: str,
ping_resp: PingCommandResult) -> None:
Expand Down
Expand Up @@ -21,13 +21,24 @@
ping_interface_async,
)

NUM_PACKETS = 4
NUM_PACKETS = 2
DEFAULT_POLLING_INTERVAL = 60
TIMEOUT_SECS = 10
TIMEOUT_SECS = 3
CHECKIN_INTERVAL = 10
CHUNK_SIZE = 100


class ICMPMonitoring(Job):
def _chunk_targets(hosts: List[str]):
"""
Yields successive n-sized chunks from target hosts.
"""
for i in range(0, len(hosts), CHUNK_SIZE):
logging.debug(
'Yielding [%s:%s] from target hosts', i, i + CHUNK_SIZE)
yield hosts[i:i + CHUNK_SIZE]


class ICMPJob(Job):
"""
Class that handles main loop to send ICMP ping to valid subscribers.
"""
Expand All @@ -42,6 +53,7 @@ def __init__(self, monitoring_module, polling_interval: int, service_loop,
DEFAULT_POLLING_INTERVAL)
self._loop = service_loop
self._module = monitoring_module
self._sem = asyncio.BoundedSemaphore(5)

async def _ping_targets(self, hosts: List[str],
targets: Optional[Dict] = None):
Expand All @@ -54,13 +66,22 @@ async def _ping_targets(self, hosts: List[str],
Returns: (stdout, stderr)
"""
if targets:
ping_params = [
PingInterfaceCommandParams(host, NUM_PACKETS, self._MTR_PORT,
TIMEOUT_SECS) for host in hosts]
ping_results = await ping_interface_async(ping_params, self._loop)
ping_results_list = list(ping_results)
for host, sub, result in zip(hosts, targets, ping_results_list):
self._save_ping_response(sub, host, result)
for chunked_hosts in _chunk_targets(hosts):
ping_params = [
PingInterfaceCommandParams(
host, NUM_PACKETS, self._MTR_PORT,
TIMEOUT_SECS,
) for host in chunked_hosts
]
async with self._sem:
try:
ping_results = await ping_interface_async(ping_params, self._loop)
ping_results_list = list(ping_results)
for host, sub, result in zip(hosts, targets, ping_results_list):
self._save_ping_response(sub, host, result)
except OSError:
logging.warning('Too many connections opened, sleeping while connections are closed...')
await asyncio.sleep(TIMEOUT_SECS, self._loop)

def _save_ping_response(self, target_id: str, ip_addr: str,
ping_resp: PingCommandResult) -> None:
Expand Down
10 changes: 6 additions & 4 deletions lte/gateway/python/magma/monitord/main.py
Expand Up @@ -19,7 +19,7 @@
from magma.common.service import MagmaService
from magma.configuration import load_service_config
from magma.monitord.cpe_monitoring import CpeMonitoringModule
from magma.monitord.icmp_monitoring import ICMPMonitoring
from magma.monitord.icmp_job import ICMPJob
from magma.monitord.icmp_state import serialize_subscriber_states


Expand Down Expand Up @@ -55,9 +55,11 @@ def main():
cpe_monitor = CpeMonitoringModule()
cpe_monitor.set_manually_configured_targets(manual_ping_targets)

icmp_monitor = ICMPMonitoring(cpe_monitor,
service.mconfig.polling_interval,
service.loop, mtr_interface)
icmp_monitor = ICMPJob(
cpe_monitor,
service.mconfig.polling_interval,
service.loop, mtr_interface,
)
icmp_monitor.start()

# Register a callback function for GetOperationalStates
Expand Down
10 changes: 6 additions & 4 deletions lte/gateway/python/magma/monitord/tests/test_icmp_monitor.py
Expand Up @@ -17,7 +17,7 @@

from lte.protos.mobilityd_pb2 import IPAddress, SubscriberIPTable
from magma.monitord.cpe_monitoring import CpeMonitoringModule
from magma.monitord.icmp_monitoring import ICMPMonitoring
from magma.monitord.icmp_job import ICMPJob

LOCALHOST = '127.0.0.1'

Expand All @@ -41,9 +41,11 @@ def setUp(self):
self.loop = asyncio.get_event_loop()
self.subscribers = {}
self.obj = CpeMonitoringModule()
self._monitor = ICMPMonitoring(self.obj, polling_interval=5,
service_loop=self.loop,
mtr_interface=LOCALHOST)
self._monitor = ICMPJob(
self.obj, polling_interval=5,
service_loop=self.loop,
mtr_interface=LOCALHOST,
)

def test_ping_subscriber_saves_response(self):
imsi = 'IMSI00000000001'
Expand Down

0 comments on commit a4ad8b0

Please sign in to comment.