diff --git a/synapse/axon.py b/synapse/axon.py index c4455608b3..75c5e07bac 100644 --- a/synapse/axon.py +++ b/synapse/axon.py @@ -466,7 +466,7 @@ def __init__(self, axondir, **opts): # create a reactor to unwrap core/heap sync events self.syncact = s_reactor.Reactor() self.syncact.act('heap:sync', self.heap.sync ) - self.syncact.act('core:sync', self.core.sync ) + self.syncact.act('core:sync', self.core.coresync ) # wrap core/heap sync events as axon:sync events self.core.on('core:sync', self._fireAxonSync ) diff --git a/synapse/cores/common.py b/synapse/cores/common.py index 5fabec1145..12e14863b4 100644 --- a/synapse/cores/common.py +++ b/synapse/cores/common.py @@ -909,16 +909,54 @@ def syncs(self, msgs): def sync(self, mesg): ''' - Feed the cortex a sync event to ingest changes from another. + Feed the cortex a sync event to make changes to the hypergraph. + + Args: + mesg (str,dict): The (name,info) for the sync event. + + Returns: + None Example: - core0.on('core:sync', core1.sync ) + core.sync(mesg) + + ''' + self.syncact.react(mesg) + + def onsync(self, func): + ''' + Register a callback for all sync events within the cortex. + + Args: + func (function((name,info))): The event callback ( see EventBus.on ) + + Returns: + None + ''' + def unwrap(mesg): + func(mesg[1].get('mesg')) + + self.on('core:sync', unwrap) + + def coresync(self, mesg): + ''' + Unwrap and sync() a core:sync event from another cortex. + + Args: + mesg ((str,dict)): An event tuple of ('core:sync',{'mesg':}). + + Returns: + None - # model changes to core0 will populate in core1 + Example: + + core0.on('core:sync', core1.coresync) ''' - self.syncact.react( mesg[1].get('mesg') ) + sync = mesg[1].get('mesg') + if mesg is not None: + self.sync(sync) # TODO #def setSyncDir(self, path): @@ -930,7 +968,7 @@ def sync(self, mesg): def addSyncFd(self, fd): ''' - Write all core:sync events to the specified file-like object. + Write all cortex sync events to the specified file-like object. Example: @@ -938,13 +976,14 @@ def addSyncFd(self, fd): core.addSyncFd(fd) ''' def saveobj(m): - fd.write( msgenpack(m) ) + # speed hacking + fd.write(msgenpack(m[1].get('mesg'))) self.on('core:sync', saveobj) def eatSyncFd(self, fd): ''' - Consume and sync all core:sync messages from the given file-like object. + Consume all coretex sync events from the given file-like object. Example: @@ -996,7 +1035,7 @@ def setSaveFd(self, fd, load=True, fini=False): core.setSaveFd(fd) NOTE: This save file is allowed to be storage layer specific. - If you want to store core:sync events, use addSyncFd(). + If you want to store cortex sync events, use addSyncFd(). ''' if load: @@ -2569,21 +2608,28 @@ def getSyncPump(self,core): core.formTufoByProp('inet:fqdn','vertex.link') ''' - pump = s_queue.Queue() + # unwrap the core:sync event + def putsync(mesg): + sync = mesg[1].get('mesg') + pump.put(sync) - self.on('core:sync', pump.put) + pump = s_queue.Queue() + self.on('core:sync', putsync) def syncpump(): - try: - for msgs in pump.slices(1000): - errs = core.syncs(msgs) + while not self.isfini: + + try: - for err in errs: - logger.warning('sync pump: %r' % (err,)) + for msgs in pump.slices(1000): + errs = core.syncs(msgs) - except Exception as e: - raise + for err in errs: + logger.warning('sync pump: %r' % (err,)) + + except Exception as e: + logger.exception(e) wrkr = s_threads.worker(syncpump) pump.onfini( wrkr.fini ) diff --git a/synapse/exc.py b/synapse/exc.py index 7d7d683707..0e80ad639b 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -14,6 +14,7 @@ def _getExcMsg(self): def items(self): return self.errinfo.items() +class NoSuchAct(SynErr):pass class NoSuchOpt(SynErr):pass class NoSuchDir(SynErr):pass class NoSuchDyn(SynErr):pass @@ -67,7 +68,6 @@ class DupUser(Exception):pass class DupRole(Exception):pass class NoSuch(Exception):pass -class NoSuchAct(Exception):pass class NoSuchJob(Exception):pass class NoSuchObj(SynErr):pass class NoSuchFile(Exception):pass diff --git a/synapse/lib/scrape.py b/synapse/lib/scrape.py index 13cdce34b9..f661c45068 100644 --- a/synapse/lib/scrape.py +++ b/synapse/lib/scrape.py @@ -49,7 +49,7 @@ def getsync(text, tags=()): with s_cortex.openurl('ram://'): core.setConfOpt('enforce',1) - core.on('core:sync', ret.append) + core.onsync(ret.append) for form,valu in scrape(text): tufo = core.formTufoByFrob(form,valu) diff --git a/synapse/reactor.py b/synapse/reactor.py index 2e7638c64c..53dd8e91ea 100644 --- a/synapse/reactor.py +++ b/synapse/reactor.py @@ -48,6 +48,6 @@ def react(self, mesg): ''' func = self.actfuncs.get(mesg[0]) if func == None: - raise NoSuchAct(mesg[0]) + raise NoSuchAct(name=mesg[0]) return func(mesg) diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index faef7a4fd8..f88f03ac69 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -688,11 +688,11 @@ def test_cortex_tags(self): core.fini() - def test_cortex_sync(self): + def test_cortex_coresync(self): core0 = s_cortex.openurl('ram://') core1 = s_cortex.openurl('ram://') - core0.on('core:sync', core1.sync ) + core0.onsync(core1.sync) tufo0 = core0.formTufoByProp('foo','bar',baz='faz') tufo1 = core1.getTufoByProp('foo','bar') @@ -1495,15 +1495,15 @@ def test_cortex_tagform(self): def test_cortex_syncs_errs(self): - syncs = [ ('core:sync', {'mesg':('newp:fake',{})} ) ] + syncs = [ ('newp:fake',{}) ] with s_cortex.openurl('ram:///') as core: - core.on('core:sync', syncs.append ) + core.onsync(syncs.append) core.formTufoByProp('inet:fqdn','vertex.link') with s_cortex.openurl('ram:///') as core: errs = core.syncs( syncs ) self.eq( len(errs), 1 ) - self.eq( errs[0][0][1]['mesg'][0], 'newp:fake' ) + self.eq( errs[0][0][0], 'newp:fake' ) self.nn( core.getTufoByProp('inet:fqdn','vertex.link') ) def test_cortex_norm_fail(self):