Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for async triggers #2464

Merged
merged 18 commits into from Nov 16, 2021
4 changes: 4 additions & 0 deletions synapse/cortex.py
Expand Up @@ -1257,10 +1257,14 @@ async def initServiceActive(self):
if self.conf.get('cron:enable'):
await self.agenda.start()
await self.stormdmons.start()
for view in self.views.values():
await view.initTrigTask()

async def initServicePassive(self):
await self.agenda.stop()
await self.stormdmons.stop()
for view in self.views.values():
await view.finiTrigTask()

@s_nexus.Pusher.onPushAuto('model:depr:lock')
async def setDeprLock(self, name, locked):
Expand Down
10 changes: 6 additions & 4 deletions synapse/lib/cell.py
Expand Up @@ -1841,11 +1841,14 @@ async def getAuthUsers(self, archived=False):
async def getAuthRoles(self):
return [r.pack() for r in self.auth.roles()]

async def dyniter(self, iden, todo, gatekeys=()):

async def reqGateKeys(self, gatekeys):
for useriden, perm, gateiden in gatekeys:
(await self.auth.reqUser(useriden)).confirm(perm, gateiden=gateiden)

async def dyniter(self, iden, todo, gatekeys=()):

await self.reqGateKeys(gatekeys)

item = self.dynitems.get(iden)
name, args, kwargs = todo

Expand All @@ -1855,8 +1858,7 @@ async def dyniter(self, iden, todo, gatekeys=()):

async def dyncall(self, iden, todo, gatekeys=()):

for useriden, perm, gateiden in gatekeys:
(await self.auth.reqUser(useriden)).confirm(perm, gateiden=gateiden)
await self.reqGateKeys(gatekeys)

item = self.dynitems.get(iden)
if item is None:
Expand Down
1 change: 1 addition & 0 deletions synapse/lib/lmdbslab.py
Expand Up @@ -364,6 +364,7 @@ async def pop(self, name, offs):
abrv = self.abrv.nameToAbrv(name)
byts = self.slab.pop(abrv + s_common.int64en(offs), db=self.qdata)
if byts is not None:
self.sizes.set(name, self.sizes.get(name) - 1)
return (offs, s_msgpack.un(byts))

async def put(self, name, item, reqid=None):
Expand Down
14 changes: 11 additions & 3 deletions synapse/lib/storm.py
Expand Up @@ -910,6 +910,8 @@
('--tag', {'help': 'Tag to fire on.'}),
('--prop', {'help': 'Property to fire on.'}),
('--query', {'help': 'Query for the trigger to execute.', 'required': True}),
('--async', {'default': False, 'action': 'store_true',
'help': 'Make the trigger run in the background.'}),
('--disabled', {'default': False, 'action': 'store_true',
'help': 'Create the trigger in disabled state.'}),
('--name', {'help': 'Human friendly name of the trigger.'}),
Expand Down Expand Up @@ -951,11 +953,12 @@

if $triggers {

$lib.print("user iden en? cond object storm query")
$lib.print("user iden en? async? cond object storm query")

for $trigger in $triggers {
$user = $trigger.username.ljust(10)
$iden = $trigger.iden.ljust(12)
$async = $lib.model.type(bool).repr($trigger.async).ljust(6)
$enabled = $lib.model.type(bool).repr($trigger.enabled).ljust(6)
$cond = $trigger.cond.ljust(9)

Expand Down Expand Up @@ -984,8 +987,8 @@
$obj2 = ' '
}

$lib.print("{user} {iden} {enabled} {cond} {obj} {obj2} {query}",
user=$user, iden=$iden, enabled=$enabled, cond=$cond,
$lib.print("{user} {iden} {enabled} {async} {cond} {obj} {obj2} {query}",
user=$user, iden=$iden, enabled=$enabled, async=$async, cond=$cond,
obj=$obj, obj2=$obj2, query=$trigger.storm)
}
} else {
Expand Down Expand Up @@ -1636,6 +1639,11 @@ async def _onRuntFini(self):
if isinstance(valu, s_base.Base):
await valu.fini()

async def reqGateKeys(self, gatekeys):
if self.asroot:
return
await self.snap.core.reqGateKeys(gatekeys)

async def dyncall(self, iden, todo, gatekeys=()):
# bypass all perms checks if we are running asroot
if self.asroot:
Expand Down
55 changes: 32 additions & 23 deletions synapse/lib/stormtypes.py
Expand Up @@ -2834,10 +2834,12 @@ class Queue(StormType):
{'name': 'offs', 'type': 'int', 'default': None,
'desc': 'Offset to pop the item from. If not specified, the first item in the queue will be'
' popped.', },
{'name': 'wait', 'type': 'boolean', 'default': False,
'desc': 'Wait for an item to be available to pop.'},
),
'returns': {'type': 'list',
'desc': 'The offset and item popped from the queue. If there is no item at the '
'offset or the queue is empty, it returns null.', }}},
'offset or the queue is empty and wait is false, it returns null.', }}},
{'name': 'put', 'desc': 'Put an item into the queue.',
'type': {'type': 'function', '_funcname': '_methQueuePut',
'args': (
Expand Down Expand Up @@ -2908,57 +2910,56 @@ def getObjLocals(self):

async def _methQueueCull(self, offs):
offs = await toint(offs)
todo = s_common.todo('coreQueueCull', self.name, offs)
gatekeys = self._getGateKeys('get')
await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
await self.runt.snap.core.coreQueueCull(self.name, offs)

async def _methQueueSize(self):
todo = s_common.todo('coreQueueSize', self.name)
gatekeys = self._getGateKeys('get')
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
return await self.runt.snap.core.coreQueueSize(self.name)

async def _methQueueGets(self, offs=0, wait=True, cull=False, size=None):
wait = await toint(wait)
offs = await toint(offs)
size = await toint(size, noneok=True)

if size is not None:
size = await toint(size)

todo = s_common.todo('coreQueueGets', self.name, offs, cull=cull, wait=wait, size=size)
gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys):
async for item in self.runt.snap.core.coreQueueGets(self.name, offs, cull=cull, wait=wait, size=size):
yield item

async def _methQueuePuts(self, items):
items = await toprim(items)
todo = s_common.todo('coreQueuePuts', self.name, items)
gatekeys = self._getGateKeys('put')
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
return await self.runt.snap.core.coreQueuePuts(self.name, items)

async def _methQueueGet(self, offs=0, cull=True, wait=True):
offs = await toint(offs)
wait = await toint(wait)

todo = s_common.todo('coreQueueGet', self.name, offs, cull=cull, wait=wait)
gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
return await self.runt.snap.core.coreQueueGet(self.name, offs, cull=cull, wait=wait)

async def _methQueuePop(self, offs=None):
async def _methQueuePop(self, offs=None, wait=False):
offs = await toint(offs, noneok=True)
wait = await tobool(wait)

gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

# emulate the old behavior on no argument
core = self.runt.snap.core
if offs is None:
todo = s_common.todo('coreQueueGets', self.name, 0, cull=True, wait=False)
async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys):
await self._methQueueCull(item[0])
return item
async for item in core.coreQueueGets(self.name, 0, wait=wait):
return await core.coreQueuePop(self.name, item[0])
return

todo = s_common.todo('coreQueuePop', self.name, offs)
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
return await core.coreQueuePop(self.name, offs)

async def _methQueuePut(self, item):
return await self._methQueuePuts((item,))
Expand Down Expand Up @@ -5660,6 +5661,9 @@ class View(Prim):
'returns': {'type': 'list', 'desc': 'A list of lines that can be printed, representing a View.', }}},
{'name': 'merge', 'desc': 'Merge a forked View back into its parent View.',
'type': {'type': 'function', '_funcname': '_methViewMerge',
'args': (
{'name': 'force', 'type': 'boolean', 'default': False, 'desc': 'Force the view to merge if possible.'},
),
'returns': {'type': 'null', }}},
{'name': 'getEdges', 'desc': 'Get node information for Edges in the View.',
'type': {'type': 'function', '_funcname': '_methGetEdges',
Expand Down Expand Up @@ -5830,13 +5834,14 @@ async def _methViewFork(self, name=None):

return View(self.runt, newv, path=self.path)

async def _methViewMerge(self):
async def _methViewMerge(self, force=False):
'''
Merge a forked view back into its parent.
'''
force = await tobool(force)
useriden = self.runt.user.iden
viewiden = self.valu.get('iden')
todo = s_common.todo('merge', useriden=useriden)
todo = s_common.todo('merge', useriden=useriden, force=force)
await self.runt.dyncall(viewiden, todo)

@registry.registerLib
Expand Down Expand Up @@ -6095,6 +6100,10 @@ async def set(self, name, valu):
useriden = self.runt.user.iden
viewiden = self.runt.snap.view.iden

name = await tostr(name)
if name == 'async':
valu = await tobool(valu)

gatekeys = ((useriden, ('trigger', 'set'), viewiden),)
todo = ('setTriggerInfo', (trigiden, name, valu), {})
await self.runt.dyncall(viewiden, todo, gatekeys=gatekeys)
Expand Down
26 changes: 18 additions & 8 deletions synapse/lib/trigger.py
Expand Up @@ -40,6 +40,7 @@
'user': {'type': 'string', 'pattern': s_config.re_iden},
'cond': {'enum': ['node:add', 'node:del', 'tag:add', 'tag:del', 'prop:set']},
'storm': {'type': 'string'},
'async': {'type': 'boolean'},
'enabled': {'type': 'boolean'},
},
'additionalProperties': True,
Expand Down Expand Up @@ -297,11 +298,14 @@ def __init__(self, view, tdef):
self.errcount = 0
self.lasterrs = collections.deque((), maxlen=5)

useriden = self.tdef.get('user')
self.user = self.view.core.auth.user(useriden)

async def set(self, name, valu):
'''
Set one of the dynamic elements of the trigger definition.
'''
assert name in ('enabled', 'storm', 'doc', 'name')
assert name in ('enabled', 'storm', 'doc', 'name', 'async')

if valu == self.tdef.get(name):
return
Expand All @@ -319,18 +323,24 @@ async def execute(self, node, vars=None, view=None):
'''
Actually execute the query
'''
opts = {}

if not self.tdef.get('enabled'):
return

useriden = self.tdef.get('user')
user = self.view.core.auth.user(useriden)
locked = user.info.get('locked')
if self.tdef.get('async'):
triginfo = {'buid': node.buid, 'trig': self.iden, 'vars': vars}
await self.view.addTrigQueue(triginfo)
return

return await self._execute(node, vars=vars, view=view)

async def _execute(self, node, vars=None, view=None):

opts = {}
locked = self.user.info.get('locked')
if locked:
if not self.lockwarned:
self.lockwarned = True
logger.warning('Skipping trigger execution because user {useriden} is locked')
logger.warning(f'Skipping trigger execution because user {self.user.iden} is locked')
return

tag = self.tdef.get('tag')
Expand All @@ -345,8 +355,8 @@ async def execute(self, node, vars=None, view=None):
view = self.view.iden

opts = {
'user': useriden,
'view': view,
'user': self.user.iden,
}

if vars is not None:
Expand Down