Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for async triggers #2464

Merged
merged 18 commits into from Nov 16, 2021
10 changes: 6 additions & 4 deletions synapse/lib/cell.py
Expand Up @@ -1841,11 +1841,14 @@ async def getAuthUsers(self, archived=False):
async def getAuthRoles(self):
return [r.pack() for r in self.auth.roles()]

async def dyniter(self, iden, todo, gatekeys=()):

async def reqGateKeys(self, gatekeys):
for useriden, perm, gateiden in gatekeys:
(await self.auth.reqUser(useriden)).confirm(perm, gateiden=gateiden)

async def dyniter(self, iden, todo, gatekeys=()):

await self.reqGateKeys(gatekeys)

item = self.dynitems.get(iden)
name, args, kwargs = todo

Expand All @@ -1855,8 +1858,7 @@ async def dyniter(self, iden, todo, gatekeys=()):

async def dyncall(self, iden, todo, gatekeys=()):

for useriden, perm, gateiden in gatekeys:
(await self.auth.reqUser(useriden)).confirm(perm, gateiden=gateiden)
await self.reqGateKeys(gatekeys)

item = self.dynitems.get(iden)
if item is None:
Expand Down
1 change: 1 addition & 0 deletions synapse/lib/lmdbslab.py
Expand Up @@ -364,6 +364,7 @@ async def pop(self, name, offs):
abrv = self.abrv.nameToAbrv(name)
byts = self.slab.pop(abrv + s_common.int64en(offs), db=self.qdata)
if byts is not None:
self.sizes.set(name, self.sizes.get(name) - 1)
return (offs, s_msgpack.un(byts))

async def put(self, name, item, reqid=None):
Expand Down
7 changes: 7 additions & 0 deletions synapse/lib/storm.py
Expand Up @@ -910,6 +910,8 @@
('--tag', {'help': 'Tag to fire on.'}),
('--prop', {'help': 'Property to fire on.'}),
('--query', {'help': 'Query for the trigger to execute.', 'required': True}),
('--async', {'default': False, 'action': 'store_true',
'help': 'Make the trigger run in the background.'}),
('--disabled', {'default': False, 'action': 'store_true',
'help': 'Create the trigger in disabled state.'}),
('--name', {'help': 'Human friendly name of the trigger.'}),
Expand Down Expand Up @@ -1636,6 +1638,11 @@ async def _onRuntFini(self):
if isinstance(valu, s_base.Base):
await valu.fini()

async def reqGateKeys(self, gatekeys):
if self.asroot:
return
await self.snap.core.reqGateKeys(gatekeys)

async def dyncall(self, iden, todo, gatekeys=()):
# bypass all perms checks if we are running asroot
if self.asroot:
Expand Down
55 changes: 32 additions & 23 deletions synapse/lib/stormtypes.py
Expand Up @@ -2834,10 +2834,12 @@ class Queue(StormType):
{'name': 'offs', 'type': 'int', 'default': None,
'desc': 'Offset to pop the item from. If not specified, the first item in the queue will be'
' popped.', },
{'name': 'wait', 'type': 'boolean', 'default': False,
'desc': 'Wait for an item to be available to pop.'},
),
'returns': {'type': 'list',
'desc': 'The offset and item popped from the queue. If there is no item at the '
'offset or the queue is empty, it returns null.', }}},
'offset or the queue is empty and wait is false, it returns null.', }}},
{'name': 'put', 'desc': 'Put an item into the queue.',
'type': {'type': 'function', '_funcname': '_methQueuePut',
'args': (
Expand Down Expand Up @@ -2908,57 +2910,56 @@ def getObjLocals(self):

async def _methQueueCull(self, offs):
offs = await toint(offs)
todo = s_common.todo('coreQueueCull', self.name, offs)
gatekeys = self._getGateKeys('get')
await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
await self.runt.snap.core.coreQueueCull(self.name, offs)

async def _methQueueSize(self):
todo = s_common.todo('coreQueueSize', self.name)
gatekeys = self._getGateKeys('get')
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
return await self.runt.snap.core.coreQueueSize(self.name)

async def _methQueueGets(self, offs=0, wait=True, cull=False, size=None):
wait = await toint(wait)
offs = await toint(offs)
size = await toint(size, noneok=True)

if size is not None:
size = await toint(size)

todo = s_common.todo('coreQueueGets', self.name, offs, cull=cull, wait=wait, size=size)
gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys):
async for item in self.runt.snap.core.coreQueueGets(self.name, offs, cull=cull, wait=wait, size=size):
yield item

async def _methQueuePuts(self, items):
items = await toprim(items)
todo = s_common.todo('coreQueuePuts', self.name, items)
gatekeys = self._getGateKeys('put')
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
await self.runt.reqGateKeys(gatekeys)
return await self.runt.snap.core.coreQueuePuts(self.name, items)

async def _methQueueGet(self, offs=0, cull=True, wait=True):
offs = await toint(offs)
wait = await toint(wait)

todo = s_common.todo('coreQueueGet', self.name, offs, cull=cull, wait=wait)
gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
return await self.runt.snap.core.coreQueueGet(self.name, offs, cull=cull, wait=wait)

async def _methQueuePop(self, offs=None):
async def _methQueuePop(self, offs=None, wait=False):
offs = await toint(offs, noneok=True)
wait = await tobool(wait)

gatekeys = self._getGateKeys('get')
await self.runt.reqGateKeys(gatekeys)

# emulate the old behavior on no argument
core = self.runt.snap.core
if offs is None:
todo = s_common.todo('coreQueueGets', self.name, 0, cull=True, wait=False)
async for item in self.runt.dyniter('cortex', todo, gatekeys=gatekeys):
await self._methQueueCull(item[0])
return item
async for item in core.coreQueueGets(self.name, 0, wait=wait):
return await core.coreQueuePop(self.name, item[0])
return

todo = s_common.todo('coreQueuePop', self.name, offs)
return await self.runt.dyncall('cortex', todo, gatekeys=gatekeys)
return await core.coreQueuePop(self.name, offs)

async def _methQueuePut(self, item):
return await self._methQueuePuts((item,))
Expand Down Expand Up @@ -5660,6 +5661,9 @@ class View(Prim):
'returns': {'type': 'list', 'desc': 'A list of lines that can be printed, representing a View.', }}},
{'name': 'merge', 'desc': 'Merge a forked View back into its parent View.',
'type': {'type': 'function', '_funcname': '_methViewMerge',
'args': (
{'name': 'force', 'type': 'boolean', 'default': False, 'desc': 'Force the view to merge if possible.'},
),
'returns': {'type': 'null', }}},
{'name': 'getEdges', 'desc': 'Get node information for Edges in the View.',
'type': {'type': 'function', '_funcname': '_methGetEdges',
Expand Down Expand Up @@ -5830,13 +5834,14 @@ async def _methViewFork(self, name=None):

return View(self.runt, newv, path=self.path)

async def _methViewMerge(self):
async def _methViewMerge(self, force=False):
'''
Merge a forked view back into its parent.
'''
force = await tobool(force)
useriden = self.runt.user.iden
viewiden = self.valu.get('iden')
todo = s_common.todo('merge', useriden=useriden)
todo = s_common.todo('merge', useriden=useriden, force=force)
await self.runt.dyncall(viewiden, todo)

@registry.registerLib
Expand Down Expand Up @@ -6095,6 +6100,10 @@ async def set(self, name, valu):
useriden = self.runt.user.iden
viewiden = self.runt.snap.view.iden

name = await tostr(name)
if name == 'async':
valu = await tobool(valu)

gatekeys = ((useriden, ('trigger', 'set'), viewiden),)
todo = ('setTriggerInfo', (trigiden, name, valu), {})
await self.runt.dyncall(viewiden, todo, gatekeys=gatekeys)
Expand Down
26 changes: 18 additions & 8 deletions synapse/lib/trigger.py
Expand Up @@ -40,6 +40,7 @@
'user': {'type': 'string', 'pattern': s_config.re_iden},
'cond': {'enum': ['node:add', 'node:del', 'tag:add', 'tag:del', 'prop:set']},
'storm': {'type': 'string'},
'async': {'type': 'boolean'},
'enabled': {'type': 'boolean'},
},
'additionalProperties': True,
Expand Down Expand Up @@ -297,11 +298,14 @@ def __init__(self, view, tdef):
self.errcount = 0
self.lasterrs = collections.deque((), maxlen=5)

useriden = self.tdef.get('user')
self.user = self.view.core.auth.user(useriden)

async def set(self, name, valu):
'''
Set one of the dynamic elements of the trigger definition.
'''
assert name in ('enabled', 'storm', 'doc', 'name')
assert name in ('enabled', 'storm', 'doc', 'name', 'async')

if valu == self.tdef.get(name):
return
Expand All @@ -319,18 +323,24 @@ async def execute(self, node, vars=None, view=None):
'''
Actually execute the query
'''
opts = {}

if not self.tdef.get('enabled'):
return

useriden = self.tdef.get('user')
user = self.view.core.auth.user(useriden)
locked = user.info.get('locked')
if self.tdef.get('async'):
triginfo = {'buid': node.buid, 'trig': self.iden}
invisig0th marked this conversation as resolved.
Show resolved Hide resolved
await self.view.addTrigQueue(triginfo)
return

return await self._execute(node, vars=vars, view=view)

async def _execute(self, node, vars=None, view=None):

opts = {}
locked = self.user.info.get('locked')
if locked:
if not self.lockwarned:
self.lockwarned = True
logger.warning('Skipping trigger execution because user {useriden} is locked')
logger.warning(f'Skipping trigger execution because user {self.user.iden} is locked')
return

tag = self.tdef.get('tag')
Expand All @@ -345,8 +355,8 @@ async def execute(self, node, vars=None, view=None):
view = self.view.iden

opts = {
'user': useriden,
'view': view,
'user': self.user.iden,
}

if vars is not None:
Expand Down
64 changes: 61 additions & 3 deletions synapse/lib/view.py
@@ -1,3 +1,4 @@
import shutil
import asyncio
import logging
import itertools
Expand All @@ -13,6 +14,7 @@
import synapse.lib.config as s_config
import synapse.lib.spooled as s_spooled
import synapse.lib.trigger as s_trigger
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.stormctrl as s_stormctrl
import synapse.lib.stormtypes as s_stormtypes

Expand Down Expand Up @@ -82,6 +84,11 @@ async def __anit__(self, core, node):
self.info = await node.dict()

self.core = core
self.dirn = s_common.gendir(core.dirn, 'views', self.iden)

slabpath = s_common.genpath(self.dirn, 'viewstate.lmdb')
self.viewslab = await s_lmdbslab.Slab.anit(slabpath)
self.trigqueue = self.viewslab.getSeqn('trigqueue')

trignode = await node.open(('triggers',))
self.trigdict = await trignode.dict()
Expand All @@ -99,6 +106,8 @@ async def __anit__(self, core, node):

await s_nexus.Pusher.__anit__(self, iden=self.iden, nexsroot=core.nexsroot)

self.onfini(self.viewslab.fini)

self.layers = []
self.invalid = None
self.parent = None # The view this view was forked from
Expand All @@ -113,6 +122,40 @@ async def __anit__(self, core, node):
# isolate some initialization to easily override.
await self._initViewLayers()

self.trigtask = None
if self.trigqueue.last():
self.trigtask = self.schedCoro(self._trigQueueLoop())

async def _trigQueueLoop(self):

while not self.isfini:

async for offs, triginfo in self.trigqueue.gets(0):
vEpiphyte marked this conversation as resolved.
Show resolved Hide resolved

buid = triginfo.get('buid')
trigiden = triginfo.get('trig')

try:
trig = self.triggers.get(trigiden)
if trig is None:
continue

async with await self.snap(trig.user) as snap:
node = await snap.getNodeByBuid(buid)
if node is None:
continue

await trig._execute(node)
invisig0th marked this conversation as resolved.
Show resolved Hide resolved

except asyncio.CancelledError: # pragma: no cover
raise

except Exception as e: # pragma: no cover
logger.exception(f'trigQueueLoop() on view {self.iden}')
invisig0th marked this conversation as resolved.
Show resolved Hide resolved

finally:
await self.delTrigQueue(offs)

async def getStorNodes(self, buid):
'''
Return a list of storage nodes for the given buid in layer order.
Expand Down Expand Up @@ -379,6 +422,16 @@ async def snap(self, user):

return await self.snapctor(self, user)

@s_nexus.Pusher.onPushAuto('trig:q:add')
async def addTrigQueue(self, triginfo):
self.trigqueue.add(triginfo)
if self.trigtask is None:
self.trigtask = self.schedCoro(self._trigQueueLoop())

@s_nexus.Pusher.onPushAuto('trig:q:del')
async def delTrigQueue(self, offs):
self.trigqueue.pop(offs)

@s_nexus.Pusher.onPushAuto('view:set')
async def setViewInfo(self, name, valu):
'''
Expand Down Expand Up @@ -476,7 +529,7 @@ async def fork(self, ldef=None, vdef=None):

return await self.core.addView(vdef)

async def merge(self, useriden=None):
async def merge(self, useriden=None, force=False):
'''
Merge this view into its parent. All changes made to this view will be applied to the parent. Parent's
triggers will be run.
Expand All @@ -488,7 +541,7 @@ async def merge(self, useriden=None):
else:
user = await self.core.auth.reqUser(useriden)

await self.mergeAllowed(user)
await self.mergeAllowed(user, force=force)

await self.core.boss.promote('storm', user=user, info={'merging': self.iden})

Expand Down Expand Up @@ -530,14 +583,17 @@ async def _tagPropSetConfirm(self, user, snap, splice):
perms = ('node', 'tag', 'add', *tag.split('.'))
self.parent._confirm(user, perms)

async def mergeAllowed(self, user=None):
async def mergeAllowed(self, user=None, force=False):
'''
Check whether a user can merge a view into its parent.
'''
fromlayr = self.layers[0]
if self.parent is None:
raise s_exc.CantMergeView(mesg=f'Cannot merge a view {self.iden} than has not been forked')

if self.trigqueue.size and not force:
raise s_exc.CantMergeView(mesg=f'There are still {self.trigqueue.size} triggers waiting to complete.', canforce=True)
invisig0th marked this conversation as resolved.
Show resolved Hide resolved

parentlayr = self.parent.layers[0]
if parentlayr.readonly:
raise s_exc.ReadOnlyLayer(mesg="May not merge if the parent's write layer is read-only")
Expand Down Expand Up @@ -642,6 +698,7 @@ async def addTrigger(self, tdef):
root = await self.core.auth.getUserByName('root')

tdef.setdefault('user', root.iden)
tdef.setdefault('async', False)
tdef.setdefault('enabled', True)

s_trigger.reqValidTdef(tdef)
Expand Down Expand Up @@ -720,6 +777,7 @@ async def delete(self):
'''
await self.fini()
await self.node.pop()
shutil.rmtree(self.dirn, ignore_errors=True)

async def addNodeEdits(self, edits, meta):
'''
Expand Down