Skip to content

Commit

Permalink
Merge 0b5493f into 0cc8029
Browse files Browse the repository at this point in the history
  • Loading branch information
invisig0th committed Jun 2, 2017
2 parents 0cc8029 + 0b5493f commit 6a2cefb
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 26 deletions.
2 changes: 1 addition & 1 deletion synapse/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ def __init__(self, axondir, **opts):
# create a reactor to unwrap core/heap sync events
self.syncact = s_reactor.Reactor()
self.syncact.act('heap:sync', self.heap.sync )
self.syncact.act('core:sync', self.core.sync )
self.syncact.act('core:sync', self.core.coresync )

# wrap core/heap sync events as axon:sync events
self.core.on('core:sync', self._fireAxonSync )
Expand Down
80 changes: 63 additions & 17 deletions synapse/cores/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,16 +909,54 @@ def syncs(self, msgs):

def sync(self, mesg):
'''
Feed the cortex a sync event to ingest changes from another.
Feed the cortex a sync event to make changes to the hypergraph.
Args:
mesg (str,dict): The (name,info) for the sync event.
Returns:
None
Example:
core0.on('core:sync', core1.sync )
core.sync(mesg)
'''
self.syncact.react(mesg)

def onsync(self, func):
'''
Register a callback for all sync events within the cortex.
Args:
func (function((name,info))): The event callback ( see EventBus.on )
Returns:
None
'''
def unwrap(mesg):
func(mesg[1].get('mesg'))

self.on('core:sync', unwrap)

def coresync(self, mesg):
'''
Unwrap and sync() a core:sync event from another cortex.
Args:
mesg ((str,dict)): An event tuple of ('core:sync',{'mesg':<mesg>}).
Returns:
None
# model changes to core0 will populate in core1
Example:
core0.on('core:sync', core1.coresync)
'''
self.syncact.react( mesg[1].get('mesg') )
sync = mesg[1].get('mesg')
if mesg is not None:
self.sync(sync)

# TODO
#def setSyncDir(self, path):
Expand All @@ -930,21 +968,22 @@ def sync(self, mesg):

def addSyncFd(self, fd):
'''
Write all core:sync events to the specified file-like object.
Write all cortex sync events to the specified file-like object.
Example:
fd = open('audit.mpk','r+b')
core.addSyncFd(fd)
'''
def saveobj(m):
fd.write( msgenpack(m) )
# speed hacking
fd.write(msgenpack(m[1].get('mesg')))

self.on('core:sync', saveobj)

def eatSyncFd(self, fd):
'''
Consume and sync all core:sync messages from the given file-like object.
Consume all coretex sync events from the given file-like object.
Example:
Expand Down Expand Up @@ -996,7 +1035,7 @@ def setSaveFd(self, fd, load=True, fini=False):
core.setSaveFd(fd)
NOTE: This save file is allowed to be storage layer specific.
If you want to store core:sync events, use addSyncFd().
If you want to store cortex sync events, use addSyncFd().
'''
if load:
Expand Down Expand Up @@ -2569,21 +2608,28 @@ def getSyncPump(self,core):
core.formTufoByProp('inet:fqdn','vertex.link')
'''
pump = s_queue.Queue()
# unwrap the core:sync event
def putsync(mesg):
sync = mesg[1].get('mesg')
pump.put(sync)

self.on('core:sync', pump.put)
pump = s_queue.Queue()
self.on('core:sync', putsync)

def syncpump():
try:

for msgs in pump.slices(1000):
errs = core.syncs(msgs)
while not self.isfini:

try:

for err in errs:
logger.warning('sync pump: %r' % (err,))
for msgs in pump.slices(1000):
errs = core.syncs(msgs)

except Exception as e:
raise
for err in errs:
logger.warning('sync pump: %r' % (err,))

except Exception as e:
logger.exception(e)

wrkr = s_threads.worker(syncpump)
pump.onfini( wrkr.fini )
Expand Down
2 changes: 1 addition & 1 deletion synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def _getExcMsg(self):
def items(self):
return self.errinfo.items()

class NoSuchAct(SynErr):pass
class NoSuchOpt(SynErr):pass
class NoSuchDir(SynErr):pass
class NoSuchDyn(SynErr):pass
Expand Down Expand Up @@ -67,7 +68,6 @@ class DupUser(Exception):pass
class DupRole(Exception):pass

class NoSuch(Exception):pass
class NoSuchAct(Exception):pass
class NoSuchJob(Exception):pass
class NoSuchObj(SynErr):pass
class NoSuchFile(Exception):pass
Expand Down
2 changes: 1 addition & 1 deletion synapse/lib/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def getsync(text, tags=()):
with s_cortex.openurl('ram://'):

core.setConfOpt('enforce',1)
core.on('core:sync', ret.append)
core.onsync(ret.append)

for form,valu in scrape(text):
tufo = core.formTufoByFrob(form,valu)
Expand Down
2 changes: 1 addition & 1 deletion synapse/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ def react(self, mesg):
'''
func = self.actfuncs.get(mesg[0])
if func == None:
raise NoSuchAct(mesg[0])
raise NoSuchAct(name=mesg[0])

return func(mesg)
10 changes: 5 additions & 5 deletions synapse/tests/test_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,11 @@ def test_cortex_tags(self):

core.fini()

def test_cortex_sync(self):
def test_cortex_coresync(self):
core0 = s_cortex.openurl('ram://')
core1 = s_cortex.openurl('ram://')

core0.on('core:sync', core1.sync )
core0.onsync(core1.sync)

tufo0 = core0.formTufoByProp('foo','bar',baz='faz')
tufo1 = core1.getTufoByProp('foo','bar')
Expand Down Expand Up @@ -1495,15 +1495,15 @@ def test_cortex_tagform(self):

def test_cortex_syncs_errs(self):

syncs = [ ('core:sync', {'mesg':('newp:fake',{})} ) ]
syncs = [ ('newp:fake',{}) ]
with s_cortex.openurl('ram:///') as core:
core.on('core:sync', syncs.append )
core.onsync(syncs.append)
core.formTufoByProp('inet:fqdn','vertex.link')

with s_cortex.openurl('ram:///') as core:
errs = core.syncs( syncs )
self.eq( len(errs), 1 )
self.eq( errs[0][0][1]['mesg'][0], 'newp:fake' )
self.eq( errs[0][0][0], 'newp:fake' )
self.nn( core.getTufoByProp('inet:fqdn','vertex.link') )

def test_cortex_norm_fail(self):
Expand Down

0 comments on commit 6a2cefb

Please sign in to comment.