Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added lib.dmon.get / lib.dmon.bump and refactored dmon loop #1998

Merged
merged 6 commits into from Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 62 additions & 0 deletions synapse/cortex.py
Expand Up @@ -751,6 +751,22 @@ async def getStormDmons(self):
async def getStormDmonLog(self, iden):
return await self.cell.getStormDmonLog(iden)

@s_cell.adminapi()
async def getStormDmon(self, iden):
return await self.cell.getStormDmon(iden)

@s_cell.adminapi()
async def bumpStormDmon(self, iden):
return await self.cell.bumpStormDmon(iden)

@s_cell.adminapi()
async def disableStormDmon(self, iden):
return await self.cell.disableStormDmon(iden)

@s_cell.adminapi()
async def enableStormDmon(self, iden):
return await self.cell.enableStormDmon(iden)

@s_cell.adminapi()
async def delStormDmon(self, iden):
return await self.cell.delStormDmon(iden)
Expand Down Expand Up @@ -3279,6 +3295,52 @@ async def addStormDmon(self, ddef):
ddef['iden'] = iden
return await self._push('storm:dmon:add', ddef)

@s_nexus.Pusher.onPushAuto('storm:dmon:bump')
async def bumpStormDmon(self, iden):
ddef = self.stormdmonhive.get(iden)
if ddef is None:
return False

if self.isactive:
dmon = self.stormdmons.getDmon(iden)
if dmon is not None:
await dmon.bump()

return True

@s_nexus.Pusher.onPushAuto('storm:dmon:enable')
async def enableStormDmon(self, iden):
ddef = self.stormdmonhive.get(iden)
if ddef is None:
return False

curv = ddef.get('enabled')

ddef['enabled'] = True
await self.stormdmonhive.set(iden, ddef)

if self.isactive and not curv:
dmon = self.stormdmons.getDmon(iden)
await dmon.run()
return True

@s_nexus.Pusher.onPushAuto('storm:dmon:disable')
async def disableStormDmon(self, iden):

ddef = self.stormdmonhive.get(iden)
if ddef is None:
return False

curv = ddef.get('enabled')

ddef['enabled'] = False
await self.stormdmonhive.set(iden, ddef)

if self.isactive and curv:
dmon = self.stormdmons.getDmon(iden)
await dmon.stop()
return True

@s_nexus.Pusher.onPush('storm:dmon:add')
async def _onAddStormDmon(self, ddef):
iden = ddef['iden']
Expand Down
4 changes: 4 additions & 0 deletions synapse/lib/spawn.py
Expand Up @@ -392,6 +392,10 @@ async def __anit__(self, spawninfo):
self.getStormDmon = self.prox.getStormDmon
self.getStormDmons = self.prox.getStormDmons

self.bumpStormDmon = self.prox.bumpStormDmon
self.enableStormDmon = self.prox.enableStormDmon
self.disableStormDmon = self.prox.disableStormDmon

# Cell specific apis
self.setHiveKey = self.prox.setHiveKey
self.getHiveKey = self.prox.getHiveKey
Expand Down
86 changes: 26 additions & 60 deletions synapse/lib/storm.py
Expand Up @@ -1010,7 +1010,6 @@ async def __anit__(self, core, iden, ddef):
self.ddef = ddef

self.task = None
self.loop_task = None
self.enabled = ddef.get('enabled')
self.user = core.auth.user(ddef.get('user'))

Expand All @@ -1024,12 +1023,15 @@ async def __anit__(self, core, iden, ddef):
async def stop(self):
if self.task is not None:
self.task.cancel()
if self.loop_task is not None:
self.loop_task.cancel()
self.status = 'stopped'
self.task = None

async def run(self):
self.task = self.schedCoro(self._run())
assert self.task is None
self.task = self.schedCoro(self.dmonloop())

async def bump(self):
await self.stop()
await self.run()

def pack(self):
retn = dict(self.ddef)
Expand All @@ -1038,64 +1040,18 @@ def pack(self):
retn['err'] = self.err_evnt.is_set()
return retn

async def _run(self):
name = self.ddef.get('name', 'storm dmon')
info = {'iden': self.iden, 'name': name}
await self.core.boss.promote('storm:dmon:main', user=self.user, info=info)
while not self.isfini:
try:
logger.info(f'Dmon loop starting ({self.iden})')
synt = await self.core.boss.execute(self._innr_run(),
name='storm:dmon:loop',
user=self.user,
info=info)
self.loop_task = synt.task
await synt.waitfini()
# We have to give the callbacks a chance to run
# so that we can determine the status of the task
await asyncio.sleep(0)
if self.loop_task.cancelled():
# Restart the loop right away
continue
# Check and re-raise exceptions
exc = self.loop_task.exception()
if exc:
raise exc
except asyncio.CancelledError:
if self.loop_task:
self.loop_task.cancel()
self.status = f'fatal error: Dmon main cancelled'
self.err_evnt.set()
logger.warning(f'Dmon main cancelled ({self.iden})')
raise
except s_exc.NoSuchView as e:
if self.loop_task:
self.loop_task.cancel()
self.status = f'fatal error: invalid view (iden={e.get("iden")})'
logger.warning(f'Dmon View is invalid. Exiting Dmon: ({self.iden})')
raise
except Exception as e: # pragma: no cover
self.status = f'error: {e}'
self.err_evnt.set()
logger.exception(f'Dmon error during loop task execution ({self.iden})')
await self.waitfini(timeout=1)

def _runLogAdd(self, mesg):
self.runlog.append((s_common.now(), mesg))

def _getRunLog(self):
return list(self.runlog)

async def _innr_run(self):
async def dmonloop(self):

s_scope.set('storm:dmon', self.iden)

text = self.ddef.get('storm')
opts = self.ddef.get('stormopts', {})
view_iden = opts.get('view')
view = self.core.getView(view_iden)
if view is None:
raise s_exc.NoSuchView(iden=view_iden)
info = {'iden': self.iden, 'name': self.ddef.get('name', 'storm dmon')}
await self.core.boss.promote('storm:dmon', user=self.user, info=info)

def dmonPrint(evnt):
self._runLogAdd(evnt)
Expand All @@ -1109,6 +1065,16 @@ def dmonWarn(evnt):

while not self.isfini:

text = self.ddef.get('storm')
opts = self.ddef.get('stormopts', {})

viewiden = opts.get('view')
view = self.core.getView(viewiden)
if view is None:
self.status = 'fatal error: invalid view'
logger.warning(f'Dmon View is invalid. Stopping Dmon.')
return

try:

self.status = 'running'
Expand All @@ -1125,23 +1091,23 @@ def dmonWarn(evnt):

logger.warning(f'Dmon query exited: {self.iden}')

self.status = 'exited'
await self.waitfini(timeout=1)
self.status = 'sleeping'

except s_stormctrl.StormExit:
self.status = 'exited'
await self.waitfini(timeout=1)
self.status = 'sleeping'

except asyncio.CancelledError:
logger.warning(f'Dmon loop cancelled: ({self.iden})')
self.status = 'stopped'
raise

except Exception as e:
self._runLogAdd(('err', s_common.excinfo(e)))
logger.exception(f'Dmon error ({self.iden})')
self.status = f'error: {e}'
self.err_evnt.set()
await self.waitfini(timeout=1)

# bottom of the loop... wait it out
await self.waitfini(timeout=1)

class Runtime:
'''
Expand Down
97 changes: 85 additions & 12 deletions synapse/lib/stormtypes.py
Expand Up @@ -283,9 +283,13 @@ class LibDmon(Lib):
def getObjLocals(self):
return {
'add': self._libDmonAdd,
'get': self._libDmonGet,
'del': self._libDmonDel,
'log': self._libDmonLog,
'list': self._libDmonList,
'bump': self._libDmonBump,
'stop': self._libDmonStop,
'start': self._libDmonStart,
}

async def _libDmonDel(self, iden):
Expand All @@ -308,6 +312,18 @@ async def _libDmonDel(self, iden):

await self.runt.snap.core.delStormDmon(iden)

async def _libDmonGet(self, iden):
'''
Return a Storm Dmon definition dict by iden.

Args:
iden (str): The iden of the Storm Dmon.

Returns:
(dict): A Storm daemon definition dict.
'''
return await self.runt.snap.core.getStormDmon(iden)

async def _libDmonList(self):
'''
Get a list of StormDmons.
Expand All @@ -330,43 +346,100 @@ async def _libDmonLog(self, iden):
self.runt.confirm(('dmon', 'log'))
return await self.runt.snap.core.getStormDmonLog(iden)

async def _libDmonAdd(self, quer, name='noname'):
async def _libDmonAdd(self, text, name='noname'):
'''
Add a StormDmon to the Cortex.

Args:
quer (str): The query to execute.

text (str): The Storm query to execute.
name (str): The name of the Dmon.

Examples:

Add a dmon that executes a query::

$lib.dmon.add(${ myquery }, name='example dmon')

Returns:
str: The iden of the newly created StormDmon.
'''
self.runt.confirm(('dmon', 'add'))
text = await tostr(text)
varz = await toprim(self.runt.vars)

viewiden = self.runt.snap.view.iden
self.runt.confirm(('dmon', 'add'), gateiden=viewiden)

# closure style capture of runtime
runtprims = await toprim(self.runt.vars)
runtvars = {k: v for (k, v) in runtprims.items() if s_msgpack.isok(v)}
varz = {k: v for (k, v) in varz.items() if s_msgpack.isok(v)}

opts = {'vars': runtvars,
'view': self.runt.snap.view.iden, # Capture the current view iden.
}
opts = {'vars': varz, 'view': viewiden}

ddef = {
'name': name,
'user': self.runt.user.iden,
'storm': str(quer),
'storm': text,
'enabled': True,
'stormopts': opts,
}

dmoniden = await self.runt.snap.core.addStormDmon(ddef)
return dmoniden
return await self.runt.snap.core.addStormDmon(ddef)

async def _libDmonBump(self, iden):
'''
Restart the daemon

Args:
iden (str): The GUID of the dmon to restart.
'''
iden = await tostr(iden)

ddef = await self.runt.snap.core.getStormDmon(iden)
if ddef is None:
return False

viewiden = ddef['stormopts']['view']
self.runt.confirm(('dmon', 'add'), gateiden=viewiden)

await self.runt.snap.core.bumpStormDmon(iden)
return True

async def _libDmonStop(self, iden):
'''
Stop a storm dmon.

Args:
iden (str): The GUID of the dmon to stop.
'''
iden = await tostr(iden)

ddef = await self.runt.snap.core.getStormDmon(iden)
if ddef is None:
return False

viewiden = ddef['stormopts']['view']
self.runt.confirm(('dmon', 'add'), gateiden=viewiden)

await self.runt.snap.core.disableStormDmon(iden)
return True

async def _libDmonStart(self, iden):
'''
Start a storm dmon.

Args:
iden (str): The GUID of the dmon to start.
'''
iden = await tostr(iden)

ddef = await self.runt.snap.core.getStormDmon(iden)
if ddef is None:
return False

viewiden = ddef['stormopts']['view']
self.runt.confirm(('dmon', 'add'), gateiden=viewiden)

await self.runt.snap.core.enableStormDmon(iden)
return True

@registry.registerLib
class LibService(Lib):
Expand Down