Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
invisig0th committed Nov 12, 2021
1 parent ec5fd7f commit 4c6385e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
4 changes: 4 additions & 0 deletions synapse/cortex.py
Expand Up @@ -1257,10 +1257,14 @@ async def initServiceActive(self):
if self.conf.get('cron:enable'):
await self.agenda.start()
await self.stormdmons.start()
for view in self.views.values():
await view.initTrigTask()

async def initServicePassive(self):
await self.agenda.stop()
await self.stormdmons.stop()
for view in self.views.values():
await view.finiTrigTask()

@s_nexus.Pusher.onPushAuto('model:depr:lock')
async def setDeprLock(self, name, locked):
Expand Down
21 changes: 17 additions & 4 deletions synapse/lib/view.py
Expand Up @@ -123,8 +123,23 @@ async def __anit__(self, core, node):
await self._initViewLayers()

self.trigtask = None
if self.trigqueue.last():
self.trigtask = self.schedCoro(self._trigQueueLoop())
await self.initTrigTask()

async def initTrigTask(self):

if self.trigtask is not None:
return

if not await self.core.isCellActive():
return

self.trigtask = self.schedCoro(self._trigQueueLoop())

async def finiTrigTask(self):

if self.trigtask is not None:
self.trigtask.cancel()
self.trigtask = None

async def _trigQueueLoop(self):

Expand Down Expand Up @@ -426,8 +441,6 @@ async def snap(self, user):
@s_nexus.Pusher.onPushAuto('trig:q:add')
async def addTrigQueue(self, triginfo):
self.trigqueue.add(triginfo)
if self.trigtask is None:
self.trigtask = self.schedCoro(self._trigQueueLoop())

@s_nexus.Pusher.onPushAuto('trig:q:del')
async def delTrigQueue(self, offs):
Expand Down
41 changes: 38 additions & 3 deletions synapse/tests/test_lib_trigger.py
Expand Up @@ -5,8 +5,10 @@

from synapse.common import aspin

import synapse.cortex as s_cortex
import synapse.telepath as s_telepath
import synapse.tests.utils as s_t_utils
import synapse.tools.backup as s_tools_backup

class TrigTest(s_t_utils.SynTest):

Expand Down Expand Up @@ -39,8 +41,7 @@ async def test_trigger_async(self):

# kill off the async consumer and queue up some requests
# to test persistance and proper resuming...
core.view.trigtask.cancel()
core.view.trigtask = True # lolz...
await core.view.finiTrigTask()

trigiden = await core.callStorm('return($lib.view.get().triggers.0.iden)')
self.nn(trigiden)
Expand All @@ -63,7 +64,7 @@ async def test_trigger_async(self):
viewiden = await core.callStorm('return($lib.view.get().fork().iden)')

view = core.getView(viewiden)
view.trigtask = True # lolz...
await view.finiTrigTask()

opts = {'view': viewiden}
await core.stormlist('trigger.add node:add --async --form inet:ipv4 --query { [+#foo] $lib.queue.gen(foo).put($node.iden()) }', opts=opts)
Expand All @@ -77,6 +78,40 @@ async def test_trigger_async(self):

self.false(os.path.isdir(view.dirn))

async def test_trigger_async_mirror(self):

with self.getTestDir() as dirn:

path00 = s_common.gendir(dirn, 'core00')
path01 = s_common.gendir(dirn, 'core01')

async with self.getTestCore(dirn=path00) as core00:
await core00.stormlist('trigger.add node:add --async --form inet:ipv4 --query { [+#foo] $lib.queue.gen(foo).put($node.iden()) }')

await core00.view.finiTrigTask()
await core00.nodes('[ inet:ipv4=1.2.3.4 ]')

s_tools_backup.backup(path00, path01)

async with self.getTestCore(dirn=path00) as core00:

url = core00.getLocalUrl()
core01conf = {'mirror': url}

async with await s_cortex.Cortex.anit(dirn=path01, conf=core01conf) as core01:
# ensure sync by forcing node construction
await core01.nodes('[ou:org=*]')
self.nn(await core00.callStorm('return($lib.queue.gen(foo).pop(wait=$lib.true))'))
self.none(await core00.callStorm('return($lib.queue.gen(foo).pop())'))

await core01.nodes('[inet:ipv4=8.8.8.8]')
self.nn(await core01.callStorm('return($lib.queue.gen(foo).pop(wait=$lib.true))'))
self.none(await core00.callStorm('return($lib.queue.gen(foo).pop())'))
self.none(await core01.callStorm('return($lib.queue.gen(foo).pop())'))

self.nn(core00.view.trigtask)
self.none(core01.view.trigtask)

async def test_trigger_recursion(self):
async with self.getTestCore() as core:
tdef = {'cond': 'node:add', 'form': 'test:guid', 'storm': '[ test:guid="*" ]'}
Expand Down

0 comments on commit 4c6385e

Please sign in to comment.