diff --git a/synapse/cortex.py b/synapse/cortex.py index d346d5967b..e7e8c19a0a 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -751,6 +751,22 @@ async def getStormDmons(self): async def getStormDmonLog(self, iden): return await self.cell.getStormDmonLog(iden) + @s_cell.adminapi() + async def getStormDmon(self, iden): + return await self.cell.getStormDmon(iden) + + @s_cell.adminapi() + async def bumpStormDmon(self, iden): + return await self.cell.bumpStormDmon(iden) + + @s_cell.adminapi() + async def disableStormDmon(self, iden): + return await self.cell.disableStormDmon(iden) + + @s_cell.adminapi() + async def enableStormDmon(self, iden): + return await self.cell.enableStormDmon(iden) + @s_cell.adminapi() async def delStormDmon(self, iden): return await self.cell.delStormDmon(iden) @@ -3279,6 +3295,52 @@ async def addStormDmon(self, ddef): ddef['iden'] = iden return await self._push('storm:dmon:add', ddef) + @s_nexus.Pusher.onPushAuto('storm:dmon:bump') + async def bumpStormDmon(self, iden): + ddef = self.stormdmonhive.get(iden) + if ddef is None: + return False + + if self.isactive: + dmon = self.stormdmons.getDmon(iden) + if dmon is not None: + await dmon.bump() + + return True + + @s_nexus.Pusher.onPushAuto('storm:dmon:enable') + async def enableStormDmon(self, iden): + ddef = self.stormdmonhive.get(iden) + if ddef is None: + return False + + curv = ddef.get('enabled') + + ddef['enabled'] = True + await self.stormdmonhive.set(iden, ddef) + + if self.isactive and not curv: + dmon = self.stormdmons.getDmon(iden) + await dmon.run() + return True + + @s_nexus.Pusher.onPushAuto('storm:dmon:disable') + async def disableStormDmon(self, iden): + + ddef = self.stormdmonhive.get(iden) + if ddef is None: + return False + + curv = ddef.get('enabled') + + ddef['enabled'] = False + await self.stormdmonhive.set(iden, ddef) + + if self.isactive and curv: + dmon = self.stormdmons.getDmon(iden) + await dmon.stop() + return True + @s_nexus.Pusher.onPush('storm:dmon:add') async def _onAddStormDmon(self, ddef): iden = ddef['iden'] diff --git a/synapse/lib/spawn.py b/synapse/lib/spawn.py index 81db9122e6..19848e7e01 100644 --- a/synapse/lib/spawn.py +++ b/synapse/lib/spawn.py @@ -392,6 +392,10 @@ async def __anit__(self, spawninfo): self.getStormDmon = self.prox.getStormDmon self.getStormDmons = self.prox.getStormDmons + self.bumpStormDmon = self.prox.bumpStormDmon + self.enableStormDmon = self.prox.enableStormDmon + self.disableStormDmon = self.prox.disableStormDmon + # Cell specific apis self.setHiveKey = self.prox.setHiveKey self.getHiveKey = self.prox.getHiveKey diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index 9895d235a7..60276764fd 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -1010,7 +1010,6 @@ async def __anit__(self, core, iden, ddef): self.ddef = ddef self.task = None - self.loop_task = None self.enabled = ddef.get('enabled') self.user = core.auth.user(ddef.get('user')) @@ -1024,12 +1023,15 @@ async def __anit__(self, core, iden, ddef): async def stop(self): if self.task is not None: self.task.cancel() - if self.loop_task is not None: - self.loop_task.cancel() - self.status = 'stopped' + self.task = None async def run(self): - self.task = self.schedCoro(self._run()) + assert self.task is None + self.task = self.schedCoro(self.dmonloop()) + + async def bump(self): + await self.stop() + await self.run() def pack(self): retn = dict(self.ddef) @@ -1038,64 +1040,18 @@ def pack(self): retn['err'] = self.err_evnt.is_set() return retn - async def _run(self): - name = self.ddef.get('name', 'storm dmon') - info = {'iden': self.iden, 'name': name} - await self.core.boss.promote('storm:dmon:main', user=self.user, info=info) - while not self.isfini: - try: - logger.info(f'Dmon loop starting ({self.iden})') - synt = await self.core.boss.execute(self._innr_run(), - name='storm:dmon:loop', - user=self.user, - info=info) - self.loop_task = synt.task - await synt.waitfini() - # We have to give the callbacks a chance to run - # so that we can determine the status of the task - await asyncio.sleep(0) - if self.loop_task.cancelled(): - # Restart the loop right away - continue - # Check and re-raise exceptions - exc = self.loop_task.exception() - if exc: - raise exc - except asyncio.CancelledError: - if self.loop_task: - self.loop_task.cancel() - self.status = f'fatal error: Dmon main cancelled' - self.err_evnt.set() - logger.warning(f'Dmon main cancelled ({self.iden})') - raise - except s_exc.NoSuchView as e: - if self.loop_task: - self.loop_task.cancel() - self.status = f'fatal error: invalid view (iden={e.get("iden")})' - logger.warning(f'Dmon View is invalid. Exiting Dmon: ({self.iden})') - raise - except Exception as e: # pragma: no cover - self.status = f'error: {e}' - self.err_evnt.set() - logger.exception(f'Dmon error during loop task execution ({self.iden})') - await self.waitfini(timeout=1) - def _runLogAdd(self, mesg): self.runlog.append((s_common.now(), mesg)) def _getRunLog(self): return list(self.runlog) - async def _innr_run(self): + async def dmonloop(self): s_scope.set('storm:dmon', self.iden) - text = self.ddef.get('storm') - opts = self.ddef.get('stormopts', {}) - view_iden = opts.get('view') - view = self.core.getView(view_iden) - if view is None: - raise s_exc.NoSuchView(iden=view_iden) + info = {'iden': self.iden, 'name': self.ddef.get('name', 'storm dmon')} + await self.core.boss.promote('storm:dmon', user=self.user, info=info) def dmonPrint(evnt): self._runLogAdd(evnt) @@ -1109,6 +1065,16 @@ def dmonWarn(evnt): while not self.isfini: + text = self.ddef.get('storm') + opts = self.ddef.get('stormopts', {}) + + viewiden = opts.get('view') + view = self.core.getView(viewiden) + if view is None: + self.status = 'fatal error: invalid view' + logger.warning(f'Dmon View is invalid. Stopping Dmon.') + return + try: self.status = 'running' @@ -1125,15 +1091,13 @@ def dmonWarn(evnt): logger.warning(f'Dmon query exited: {self.iden}') - self.status = 'exited' - await self.waitfini(timeout=1) + self.status = 'sleeping' except s_stormctrl.StormExit: - self.status = 'exited' - await self.waitfini(timeout=1) + self.status = 'sleeping' except asyncio.CancelledError: - logger.warning(f'Dmon loop cancelled: ({self.iden})') + self.status = 'stopped' raise except Exception as e: @@ -1141,7 +1105,9 @@ def dmonWarn(evnt): logger.exception(f'Dmon error ({self.iden})') self.status = f'error: {e}' self.err_evnt.set() - await self.waitfini(timeout=1) + + # bottom of the loop... wait it out + await self.waitfini(timeout=1) class Runtime: ''' diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index 898c19a67b..d9af8ebb36 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -283,9 +283,13 @@ class LibDmon(Lib): def getObjLocals(self): return { 'add': self._libDmonAdd, + 'get': self._libDmonGet, 'del': self._libDmonDel, 'log': self._libDmonLog, 'list': self._libDmonList, + 'bump': self._libDmonBump, + 'stop': self._libDmonStop, + 'start': self._libDmonStart, } async def _libDmonDel(self, iden): @@ -308,6 +312,18 @@ async def _libDmonDel(self, iden): await self.runt.snap.core.delStormDmon(iden) + async def _libDmonGet(self, iden): + ''' + Return a Storm Dmon definition dict by iden. + + Args: + iden (str): The iden of the Storm Dmon. + + Returns: + (dict): A Storm daemon definition dict. + ''' + return await self.runt.snap.core.getStormDmon(iden) + async def _libDmonList(self): ''' Get a list of StormDmons. @@ -330,16 +346,16 @@ async def _libDmonLog(self, iden): self.runt.confirm(('dmon', 'log')) return await self.runt.snap.core.getStormDmonLog(iden) - async def _libDmonAdd(self, quer, name='noname'): + async def _libDmonAdd(self, text, name='noname'): ''' Add a StormDmon to the Cortex. Args: - quer (str): The query to execute. - + text (str): The Storm query to execute. name (str): The name of the Dmon. Examples: + Add a dmon that executes a query:: $lib.dmon.add(${ myquery }, name='example dmon') @@ -347,26 +363,83 @@ async def _libDmonAdd(self, quer, name='noname'): Returns: str: The iden of the newly created StormDmon. ''' - self.runt.confirm(('dmon', 'add')) + text = await tostr(text) + varz = await toprim(self.runt.vars) + + viewiden = self.runt.snap.view.iden + self.runt.confirm(('dmon', 'add'), gateiden=viewiden) # closure style capture of runtime - runtprims = await toprim(self.runt.vars) - runtvars = {k: v for (k, v) in runtprims.items() if s_msgpack.isok(v)} + varz = {k: v for (k, v) in varz.items() if s_msgpack.isok(v)} - opts = {'vars': runtvars, - 'view': self.runt.snap.view.iden, # Capture the current view iden. - } + opts = {'vars': varz, 'view': viewiden} ddef = { 'name': name, 'user': self.runt.user.iden, - 'storm': str(quer), + 'storm': text, 'enabled': True, 'stormopts': opts, } - dmoniden = await self.runt.snap.core.addStormDmon(ddef) - return dmoniden + return await self.runt.snap.core.addStormDmon(ddef) + + async def _libDmonBump(self, iden): + ''' + Restart the daemon + + Args: + iden (str): The GUID of the dmon to restart. + ''' + iden = await tostr(iden) + + ddef = await self.runt.snap.core.getStormDmon(iden) + if ddef is None: + return False + + viewiden = ddef['stormopts']['view'] + self.runt.confirm(('dmon', 'add'), gateiden=viewiden) + + await self.runt.snap.core.bumpStormDmon(iden) + return True + + async def _libDmonStop(self, iden): + ''' + Stop a storm dmon. + + Args: + iden (str): The GUID of the dmon to stop. + ''' + iden = await tostr(iden) + + ddef = await self.runt.snap.core.getStormDmon(iden) + if ddef is None: + return False + + viewiden = ddef['stormopts']['view'] + self.runt.confirm(('dmon', 'add'), gateiden=viewiden) + + await self.runt.snap.core.disableStormDmon(iden) + return True + + async def _libDmonStart(self, iden): + ''' + Start a storm dmon. + + Args: + iden (str): The GUID of the dmon to start. + ''' + iden = await tostr(iden) + + ddef = await self.runt.snap.core.getStormDmon(iden) + if ddef is None: + return False + + viewiden = ddef['stormopts']['view'] + self.runt.confirm(('dmon', 'add'), gateiden=viewiden) + + await self.runt.snap.core.enableStormDmon(iden) + return True @registry.registerLib class LibService(Lib): diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index ae0c9e8914..81f5c0d5ed 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -4211,59 +4211,6 @@ async def test_cortex_spawn_notsupported(self): with self.raises(s_exc.FeatureNotSupported): await core.getSpawnInfo() - async def test_cortex_storm_dmon_ps(self): - - with self.getTestDir() as dirn: - - async with await s_cortex.Cortex.anit(dirn) as core: - async with core.getLocalProxy() as prox: - await core.nodes('$lib.queue.add(visi)') - ddef = {'storm': '$lib.queue.get(visi).put(done) for $tick in $lib.time.ticker(1) {}'} - dmonpack = await prox.addStormDmon(ddef) - dmoniden = dmonpack.get('iden') - # Storm task pairs are promoted as tasks - retn = await prox.ps() - dmon_loop_tasks = [task for task in retn if task.get('name') == 'storm:dmon:loop'] - dmon_main_tasks = [task for task in retn if task.get('name') == 'storm:dmon:main'] - self.len(1, dmon_loop_tasks) - self.len(1, dmon_main_tasks) - self.eq(dmon_loop_tasks[0].get('info').get('iden'), dmoniden) - self.eq(dmon_main_tasks[0].get('info').get('iden'), dmoniden) - # We can kill the loop task and it will respawn - mpid = dmon_main_tasks[0].get('iden') - lpid = dmon_loop_tasks[0].get('iden') - self.true(await prox.kill(lpid)) - await asyncio.sleep(0) - retn = await prox.ps() - dmon_loop_tasks = [task for task in retn if task.get('name') == 'storm:dmon:loop'] - dmon_main_tasks = [task for task in retn if task.get('name') == 'storm:dmon:main'] - self.len(1, dmon_loop_tasks) - self.len(1, dmon_main_tasks) - self.eq(dmon_main_tasks[0].get('iden'), mpid) - self.ne(dmon_loop_tasks[0].get('iden'), lpid) - # If we kill the main task, there is no respawn - self.true(await prox.kill(mpid)) - await asyncio.sleep(0) - retn = await prox.ps() - dmon_loop_tasks = [task for task in retn if task.get('name') == 'storm:dmon:loop'] - dmon_main_tasks = [task for task in retn if task.get('name') == 'storm:dmon:main'] - self.len(0, dmon_loop_tasks) - self.len(0, dmon_main_tasks) - - iden = 'XXX' - with self.raises(s_exc.NoSuchIden): - await prox.delStormDmon(iden) - - with self.raises(s_exc.SchemaViolation): - await core.runStormDmon(iden, {}) - - with self.raises(s_exc.SchemaViolation): - await core.runStormDmon(iden, {'user': 'XXX'}) - - async with await s_cortex.Cortex.anit(dirn) as core: - # two entries means he ran twice ( once on add and once on restart ) - await core.nodes('$lib.queue.get(visi).gets(size=2)') - async def test_cortex_storm_dmon_view(self): with self.getTestDir() as dirn: @@ -4307,7 +4254,7 @@ async def test_cortex_storm_dmon_view(self): await core.nodes('test:int', opts={'view': view2_iden}) with self.getAsyncLoggerStream('synapse.lib.storm', - 'Dmon View is invalid. Exiting Dmon') as stream: + 'Dmon View is invalid. Stopping Dmon') as stream: async with self.getTestCore(dirn=dirn) as core: self.true(await stream.wait(6)) msgs = await core.stormlist('dmon.list') diff --git a/synapse/tests/test_lib_spawn.py b/synapse/tests/test_lib_spawn.py index d0343413df..82fbc25c77 100644 --- a/synapse/tests/test_lib_spawn.py +++ b/synapse/tests/test_lib_spawn.py @@ -620,7 +620,7 @@ async def test_spawn_dmon_cmds(self): ''' Copied from test-cortex_storm_lib_dmon_cmds ''' - async with self.getTestCoreAndProxy() as (_, prox): + async with self.getTestCoreAndProxy() as (core, prox): opts = {'spawn': True} await prox.storm(''' $q = $lib.queue.add(visi) @@ -643,7 +643,19 @@ async def test_spawn_dmon_cmds(self): msgs = await prox.storm('dmon.list', opts=opts).list() self.stormIsInPrint('(wootdmon ): running', msgs) - msgs = await prox.storm('$lib.dmon.del($ddef.iden)').list() + dmon = list(core.stormdmons.dmons.values())[0] + + # make the dmon blow up + q = '''$lib.queue.get(boom).put(hehe) + $q = $lib.queue.get(visi) + for ($offs, $item) in $q.gets(size=1) { $q.cull($offs) } + ''' + _ = await prox.storm(q, opts=opts).list() + + self.true(await s_coro.event_wait(dmon.err_evnt, 6)) + + msgs = await prox.storm('dmon.list').list() + self.stormIsInPrint('(wootdmon ): error', msgs) async def test_spawn_forked_view(self): async with self.getTestCoreAndProxy() as (core, prox): diff --git a/synapse/tests/test_lib_storm.py b/synapse/tests/test_lib_storm.py index 467d25e26f..6b51ceb2d1 100644 --- a/synapse/tests/test_lib_storm.py +++ b/synapse/tests/test_lib_storm.py @@ -226,6 +226,39 @@ async def test_lib_storm_basics(self): self.eq({'foo': 'bar'}, await core.callStorm('return($lib.dict( foo = bar ))')) + ddef0 = await core.callStorm('return($lib.dmon.add(${ $lib.queue.gen(hehedmon).put(lolz) $lib.time.sleep(10) }, name=hehedmon))') + ddef1 = await core.callStorm('return($lib.dmon.get($iden))', opts={'vars': {'iden': ddef0.get('iden')}}) + self.none(await core.callStorm('return($lib.dmon.get(newp))')) + + self.eq(ddef0['iden'], ddef1['iden']) + + self.eq((0, 'lolz'), await core.callStorm('return($lib.queue.gen(hehedmon).get(0))')) + + task = core.stormdmons.getDmon(ddef0['iden']).task + self.true(await core.callStorm(f'return($lib.dmon.bump($iden))', opts={'vars': {'iden': ddef0['iden']}})) + self.ne(task, core.stormdmons.getDmon(ddef0['iden']).task) + + self.true(await core.callStorm(f'return($lib.dmon.stop($iden))', opts={'vars': {'iden': ddef0['iden']}})) + self.none(core.stormdmons.getDmon(ddef0['iden']).task) + + self.true(await core.callStorm(f'return($lib.dmon.start($iden))', opts={'vars': {'iden': ddef0['iden']}})) + self.nn(core.stormdmons.getDmon(ddef0['iden']).task) + + self.false(await core.callStorm(f'return($lib.dmon.bump(newp))')) + self.false(await core.callStorm(f'return($lib.dmon.stop(newp))')) + self.false(await core.callStorm(f'return($lib.dmon.start(newp))')) + + self.eq((1, 'lolz'), await core.callStorm('return($lib.queue.gen(hehedmon).get(1))')) + + async with core.getLocalProxy() as proxy: + self.nn(await proxy.getStormDmon(ddef0['iden'])) + self.true(await proxy.bumpStormDmon(ddef0['iden'])) + self.true(await proxy.disableStormDmon(ddef0['iden'])) + self.true(await proxy.enableStormDmon(ddef0['iden'])) + self.false(await proxy.bumpStormDmon('newp')) + self.false(await proxy.disableStormDmon('newp')) + self.false(await proxy.enableStormDmon('newp')) + async def test_storm_pipe(self): async with self.getTestCore() as core: