From b97122343beba0001fc27d2611d73c51762c8902 Mon Sep 17 00:00:00 2001 From: invisig0th Date: Wed, 24 May 2017 21:20:33 -0400 Subject: [PATCH] figured out a better way to do telepath client side methods and gutted the crazy meta-class builder --- synapse/async.py | 8 +- synapse/axon.py | 3 +- synapse/cores/common.py | 129 ++++++++++++++++----------------- synapse/daemon.py | 9 +++ synapse/dyndeps.py | 12 +++ synapse/exc.py | 16 ++++ synapse/lib/mixins.py | 4 +- synapse/lib/reflect.py | 25 +++++++ synapse/telepath.py | 94 +++++++++++++++++------- synapse/tests/test_axon.py | 18 +++++ synapse/tests/test_telepath.py | 17 +++++ 11 files changed, 235 insertions(+), 100 deletions(-) diff --git a/synapse/async.py b/synapse/async.py index 79c69f7273..aa69e4c45b 100644 --- a/synapse/async.py +++ b/synapse/async.py @@ -27,10 +27,16 @@ def jobret(job): ''' err = job[1].get('err') + + # populate errinfo into SynErr + info = job[1].get('errinfo') + if info == None: + info = {} + if err != None: if err != 'NameErr': try: - raise synerr(err) + raise synerr(err,**info) except NameError as e: pass raise JobErr(job) diff --git a/synapse/axon.py b/synapse/axon.py index c4455608b3..3df0b17e5a 100644 --- a/synapse/axon.py +++ b/synapse/axon.py @@ -160,7 +160,7 @@ class AxonMixin: The parts of the Axon which must be executed locally in proxy cases. ( used as mixin for both Axon and AxonProxy ) ''' - + @s_telepath.clientside def eatfd(self, fd): ''' Consume the contents of a file object into the axon as a blob. @@ -189,6 +189,7 @@ def eatfd(self, fd): return retn + @s_telepath.clientside def eatbytes(self, byts): ''' Consume a buffer of bytes into the axon as a blob. diff --git a/synapse/cores/common.py b/synapse/cores/common.py index 5ce1bc7239..e2f0245918 100644 --- a/synapse/cores/common.py +++ b/synapse/cores/common.py @@ -58,78 +58,13 @@ def reqstor(name,valu): raise BadPropValu(name=name,valu=valu) return valu -class CortexMixin: - - def __init__(self): - pass - - def formNodeByBytes(self, byts, stor=True, **props): - ''' - Form a new file:bytes node by passing bytes and optional props. - - If stor=False is specified, the cortex will create the file:bytes - node even if it is not configured with access to an axon to store - the bytes. - - Example: - - core.formNodeByBytes(byts,name='foo.exe') - - ''' - - hset = s_hashset.HashSet() - hset.update(byts) - - iden,info = hset.guid() - - props.update(info) - - if stor: - - size = props.get('size') - upid = self._getAxonWants('guid',iden,size) - - if upid != None: - for chun in chunks(byts,10000000): - self._addAxonChunk(upid,chun) - - return self.formTufoByProp('file:bytes', iden, **props) - - def formNodeByFd(self, fd, stor=True, **props): - ''' - Form a new file:bytes node by passing a file object and optional props. - ''' - hset = s_hashset.HashSet() - iden,info = hset.eatfd(fd) - - props.update(info) - - - if stor: - - size = props.get('size') - upid = self._getAxonWants('guid',iden,size) - - # time to send it! - if upid != None: - for byts in iterfd(fd): - self._addAxonChunk(upid,byts) - - node = self.formTufoByProp('file:bytes', iden, **props) - - if node[1].get('file:bytes:size') == None: - self.setTufoProp(node,'size',info.get('size')) - - return node - -class Cortex(EventBus,DataModel,Runtime,Configable,CortexMixin,s_ingest.IngestApi): +class Cortex(EventBus,DataModel,Runtime,Configable,s_ingest.IngestApi): ''' Top level Cortex key/valu storage object. ''' def __init__(self, link, **conf): Runtime.__init__(self) EventBus.__init__(self) - CortexMixin.__init__(self) Configable.__init__(self) # a cortex may have a ref to an axon @@ -2231,7 +2166,67 @@ def savemesg(mesg): ''' self.savebus.link(func) - # FIXME addSyncLink() + @s_telepath.clientside + def formNodeByBytes(self, byts, stor=True, **props): + ''' + Form a new file:bytes node by passing bytes and optional props. + + If stor=False is specified, the cortex will create the file:bytes + node even if it is not configured with access to an axon to store + the bytes. + + Example: + + core.formNodeByBytes(byts,name='foo.exe') + + ''' + + hset = s_hashset.HashSet() + hset.update(byts) + + iden,info = hset.guid() + + props.update(info) + + if stor: + + size = props.get('size') + upid = self._getAxonWants('guid',iden,size) + + if upid != None: + for chun in chunks(byts,10000000): + self._addAxonChunk(upid,chun) + + return self.formTufoByProp('file:bytes', iden, **props) + + @s_telepath.clientside + def formNodeByFd(self, fd, stor=True, **props): + ''' + Form a new file:bytes node by passing a file object and optional props. + ''' + hset = s_hashset.HashSet() + iden,info = hset.eatfd(fd) + + props.update(info) + + + if stor: + + size = props.get('size') + upid = self._getAxonWants('guid',iden,size) + + # time to send it! + if upid != None: + for byts in iterfd(fd): + self._addAxonChunk(upid,byts) + + node = self.formTufoByProp('file:bytes', iden, **props) + + if node[1].get('file:bytes:size') == None: + self.setTufoProp(node,'size',info.get('size')) + + return node + def _okSetProp(self, prop): # check for enforcement and validity of a full prop name diff --git a/synapse/daemon.py b/synapse/daemon.py index a4baf34ec5..4da9ee5af7 100644 --- a/synapse/daemon.py +++ b/synapse/daemon.py @@ -301,6 +301,7 @@ def __init__(self, pool=None): self.socks = {} # sockets by iden self.shared = {} # objects provided by daemon self.pushed = {} # objects provided by sockets + self.csides = {} # item:[ (name,path), ... ] self.reflect = {} # objects reflect info by name self._dmon_links = [] # list of listen links @@ -433,6 +434,7 @@ def _onTelePushMesg(self, sock, mesg): jid = mesg[1].get('jid') name = mesg[1].get('name') + csides = mesg[1].get('csides') reflect = mesg[1].get('reflect') user = sock.get('syn:user') @@ -446,6 +448,7 @@ def onfini(): sock.onfini(onfini) self.pushed[name] = sock + self.csides[name] = csides self.reflect[name] = reflect return sock.tx( tufo('job:done', jid=jid) ) @@ -556,6 +559,7 @@ def _onTeleSynMesg(self, sock, mesg): } if name != None: + ret['csides'] = self.csides.get(name) ret['reflect'] = self.reflect.get(name) # send a nonce along for the ride in case @@ -681,6 +685,10 @@ def _onTeleCallMesg(self, sock, mesg): if func == None: raise NoSuchMeth(meth) + if getattr(func,'_tele_clientside',False): + name = s_reflect.getMethName(func) + raise TeleClientSide(name=name) + ret = func(*args,**kwargs) # handle generator returns specially @@ -775,6 +783,7 @@ def share(self, name, item, fini=False): ''' self.shared[name] = item self.reflect[name] = s_reflect.getItemInfo(item) + self.csides[name] = s_telepath.getClientSides(item) if fini: self.onfini( item.fini ) diff --git a/synapse/dyndeps.py b/synapse/dyndeps.py index ec5013aec8..ff097bc8ac 100644 --- a/synapse/dyndeps.py +++ b/synapse/dyndeps.py @@ -69,6 +69,18 @@ def getDynLocal(name): return getattr(mod,objname,None) +def getDynMeth(name): + ''' + Retrieve and return an unbound method by python path. + ''' + cname,fname = name.rsplit('.',1) + + clas = getDynLocal(cname) + if clas is None: + return None + + return getattr(clas,fname,None) + def tryDynMod(name): ''' Dynamically import a python module or exception. diff --git a/synapse/exc.py b/synapse/exc.py index 7d7d683707..31ea82807e 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -14,6 +14,20 @@ def _getExcMsg(self): def items(self): return self.errinfo.items() + def get(self, name): + ''' + Return a value from the errinfo dict. + + Example: + + try: + foothing() + except SynErr as e: + blah = e.get('blah') + + ''' + return self.errinfo.get(name) + class NoSuchOpt(SynErr):pass class NoSuchDir(SynErr):pass class NoSuchDyn(SynErr):pass @@ -60,6 +74,8 @@ class BadMesgResp(Exception):pass class BadPropValu(SynErr):pass class BadPySource(Exception):pass +class TeleClientSide(SynErr):pass + class HitStormLimit(SynErr):pass class DupOpt(Exception):pass diff --git a/synapse/lib/mixins.py b/synapse/lib/mixins.py index 21c36784b7..4448576549 100644 --- a/synapse/lib/mixins.py +++ b/synapse/lib/mixins.py @@ -16,7 +16,7 @@ def addSynMixin(subsys, name, cname=None): Example: - s_mixins.addSynMixin('telepath','synapse.axon.AxonMixin') + s_mixins.addSynMixin('foo','synapse.foo.FooMixin') ''' if cname == None: @@ -37,5 +37,3 @@ def getSynMixins(subsys,name): if not names: return () return [ s_dyndeps.getDynLocal(name) for name in names ] - -#addTeleMixin('synapse.axon.AxonMixin') diff --git a/synapse/lib/reflect.py b/synapse/lib/reflect.py index f878a600a5..8632474117 100644 --- a/synapse/lib/reflect.py +++ b/synapse/lib/reflect.py @@ -15,6 +15,31 @@ def getClsNames(item): mro = [ c for c in mro if c not in clsskip ] return [ '%s.%s' % (c.__module__,c.__name__) for c in mro ] +def getMethName(meth): + ''' + Return a fully qualified string for the .. name + of a given method. + ''' + item = meth.__self__ + mname = item.__module__ + cname = item.__class__.__name__ + fname = meth.__func__.__name__ + return '.'.join((mname,cname,fname)) + +def getItemLocals(item): + ''' + Iterate the locals of an item and yield (name,valu) pairs. + + Example: + + for name,valu in getItemLocals(item): + dostuff() + + ''' + for name in dir(item): + valu = getattr(item,name,None) + yield name,valu + def getItemInfo(item): ''' Get "reflection info" dict for the given object. diff --git a/synapse/telepath.py b/synapse/telepath.py index 2ab8aed51f..6d39100974 100644 --- a/synapse/telepath.py +++ b/synapse/telepath.py @@ -27,8 +27,6 @@ from synapse.common import * from synapse.compat import queue -s_mixins.addSynMixin('telepath','synapse.axon.AxonMixin') - # telepath protocol version # ( compat breaks only at major ver ) telever = (1,0) @@ -85,30 +83,7 @@ def openlink(link): synack = teleSynAck(sock, name=name) bases = () - inherits = () - - refl = synack.get('reflect') - if refl != None: - inherits = refl.get('inherits',()) - - return getMixClass(inherits)(relay,sock=sock) - -classcache = {} -def getMixClass(inherits): - base = [Proxy,] - - for name in inherits: - for mixin in s_mixins.getSynMixins('telepath',name): - base.append(mixin) - - inherit = tuple(base) - - clas = classcache.get(inherit) - if clas == None: - clas = type('Proxy',inherit,{}) - classcache[inherit] = clas - - return clas + return Proxy(relay,sock=sock) def evalurl(url,**opts): ''' @@ -137,11 +112,22 @@ class Method: def __init__(self, proxy, meth): self.meth = meth + self.cside = None self.proxy = proxy + # act as much like a bound method as possible... + self.__name__ = meth + self.__self__ = proxy + def __call__(self, *args, **kwargs): + + # check if we have a cached client-side function + if self.cside != None: + return self.cside(self.proxy,*args,**kwargs) + ondone = kwargs.pop('ondone',None) task = (self.meth,args,kwargs) + job = self.proxy._tx_call( task, ondone=ondone ) if ondone != None: return job @@ -202,6 +188,7 @@ def __init__(self, relay, plex=None, sock=None): self._tele_relay = relay # LinkRelay() self._tele_link = relay.link self._tele_yields = {} + self._tele_csides = {} self._tele_reflect = None # obj name is path minus leading "/" @@ -334,8 +321,9 @@ def push(self, name, item): prox.push( 'bar', Bar() ) ''' + csides = getClientSides(item) reflect = s_reflect.getItemInfo(item) - job = self._txTeleJob('tele:push', name=name, reflect=reflect) + job = self._txTeleJob('tele:push', name=name, reflect=reflect, csides=csides) self._tele_pushed[ name ] = item return self.syncjob(job) @@ -481,6 +469,10 @@ def _teleSynAck(self, sock): self._tele_sid = synack.get('sess') self._tele_reflect = synack.get('reflect') + csides = synack.get('csides') + if csides is not None: + self._tele_csides.update(csides) + hisopts = synack.get('opts',{}) if hisopts.get('sock:can:gzip'): @@ -524,7 +516,17 @@ def _onProxyFini(self): self._tele_pool.fini() def __getattr__(self, name): - meth = Method(self, name) + path = self._tele_csides.get(name) + if path is not None: + func = s_dyndeps.getDynMeth(path) + if func is None: + return None + + meth =func.__get__(self) + + else: + meth = Method(self, name) + setattr(self,name,meth) return meth @@ -564,3 +566,39 @@ def teleSynAck(sock, name=None, sid=None): raise BadMesgVers(myver=telever,hisver=vers) return synack + +def clientside(f): + ''' + A function decorator which causes the given function to be run on + the telepath client side. + + Example: + + @s_telepath.clientside + def foo(self, bar): + dostuff() + + NOTE: you *must* use APIs within the function to access locals etc. + ( ie, the method must be able to have self be a telepath proxy ) + ''' + f._tele_clientside = True + return f + +def getClientSides(item): + ''' + Return a dict of name:path pairs for any clientside functions in item. + ''' + retn = {} + + for name,valu in s_reflect.getItemLocals(item): + + if not getattr(valu,'_tele_clientside',False): + continue + + path = s_reflect.getMethName(valu) + if path == None: + continue + + retn[name] = path + + return retn diff --git a/synapse/tests/test_axon.py b/synapse/tests/test_axon.py index 0e33feae70..91821dff8d 100644 --- a/synapse/tests/test_axon.py +++ b/synapse/tests/test_axon.py @@ -5,6 +5,7 @@ import synapse.axon as s_axon import synapse.daemon as s_daemon import synapse.lib.heap as s_heap +import synapse.daemon as s_daemon import synapse.telepath as s_telepath import synapse.lib.service as s_service @@ -614,3 +615,20 @@ def test_axon_get_renameprops(self, *args, **kwargs): self.assertGreater(actual['st_atime'], 1000000000) self.assertGreater(actual['st_ctime'], 1000000000) self.assertGreater(actual['st_mtime'], 1000000000) + + def test_axon_telepath(self): + with self.getTestDir() as dirname: + + with s_daemon.Daemon() as dmon: + + link = dmon.listen('tcp://127.0.0.1:0/') + port = link[1].get('port') + + with s_axon.Axon(dirname) as axon: + dmon.share('axon',axon) + + prox = s_telepath.openurl('tcp://127.0.0.1/axon', port=port) + + with io.BytesIO(b'vertex') as fd: + blob = prox.eatfd(fd) + print(blob) diff --git a/synapse/tests/test_telepath.py b/synapse/tests/test_telepath.py index 6b67d325d8..521fc99936 100644 --- a/synapse/tests/test_telepath.py +++ b/synapse/tests/test_telepath.py @@ -27,6 +27,10 @@ def echo(self, x): def speed(self): return + @s_telepath.clientside + def localthing(self, x): + return self.echo(x) + class TelePathTest(SynTest): def getFooServ(self): @@ -379,3 +383,16 @@ def on(*args, **kwargs): wait.wait() self.assertEqual(counters[0], 1) self.assertEqual(counters[1], 1) + + def test_telepath_clientside(self): + + with s_daemon.Daemon() as dmon: + + link = dmon.listen('tcp://127.0.0.1:0/') + port = link[1].get('port') + + dmon.share('foo',Foo()) + + with s_telepath.openurl('tcp://127.0.0.1/foo', port=port) as prox: + self.eq( prox.localthing(20), 20 ) + self.eq( prox.localthing(30), 30 )