Browse files

Merge pull request #11007 from SmithSamuelM/sam_raet_16

Sam raet 16  Support for msgpack raet bodies.  Preliminary support for persisting Raet estate and key data
  • Loading branch information...
2 parents fb3c89d + 36c9487 commit 589eba548848a63c750202f1554b3132c79c17b1 @SmithSamuelM SmithSamuelM committed Mar 6, 2014
View
2 salt/transport/road/raet/behaving.py
@@ -56,7 +56,7 @@ class StackUdpRaet(deeding.Deed): # pylint: disable=W0232
stack='stack',
txmsgs=odict(ipath='txmsgs', ival=deque()),
rxmsgs=odict(ipath='rxmsgs', ival=deque()),
- local=odict(ipath='local', ival=odict( name='minion',
+ local=odict(ipath='local', ival=odict( name='master',
eid=0,
host='0.0.0.0',
port=raeting.RAET_PORT,
View
3 salt/transport/road/raet/estating.py
@@ -95,14 +95,15 @@ class LocalEstate(Estate):
RAET protocol endpoint local estate object
Maintains signer for signing and privateer for encrypt/decript
'''
- def __init__(self, sigkey=None, prikey=None, **kwa):
+ def __init__(self, main=False, sigkey=None, prikey=None, **kwa):
'''
Setup Estate instance
sigkey is either nacl SigningKey or hex encoded key
prikey is either nacl PrivateKey or hex encoded key
'''
super(LocalEstate, self).__init__(**kwa)
+ self.main = True if main else False # main estate for road
self.signer = nacling.Signer(sigkey)
self.priver = nacling.Privateer(prikey) # Long term key
View
63 salt/transport/road/raet/keeping.py
@@ -32,16 +32,20 @@ class Keep(object):
'''
RAET protocol base class for estate data persistence
'''
- def __init__(self, dirpath='', prefix='estate', ext='json', **kwa):
+ def __init__(self, dirpath='', stackname='stack', prefix='estate', ext='json', **kwa):
'''
Setup Keep instance
Create directories for saving associated estate data files
keep/
- local/
- remote/
+ stackname/
+ local/
+ prefix.uid.ext
+ remote/
+ prefix.uid.ext
+ prefix.uid.ext
'''
if not dirpath:
- dirpath = "/tmp/raet/keep"
+ dirpath = os.path.join("/tmp/raet/keep", stackname)
self.dirpath = os.path.abspath(dirpath)
if not os.path.exists(self.dirpath):
os.makedirs(self.dirpath)
@@ -102,7 +106,7 @@ def loadLocalData(self):
return None
return (self.load(self.localfilepath))
- def removeLocalData(self):
+ def clearLocalData(self):
'''
Load and Return the data from the local estate
'''
@@ -135,7 +139,7 @@ def loadRemoteData(self, uid):
return None
return (self.load(filepath))
- def removeRemoteData(self, uid):
+ def clearRemoteData(self, uid):
'''
Load and Return the data from the remote estate file named with uid
'''
@@ -160,7 +164,7 @@ def loadAllRemoteData(self):
data[uid] = self.load(filepath)
return data
- def removeAllRemoteData(self):
+ def clearAllRemoteData(self):
'''
Remove all the remote estate files
'''
@@ -176,10 +180,9 @@ def removeAllRemoteData(self):
if os.path.exists(filepath):
os.remove(filepath)
-
def dumpAllRemoteEstates(self, estates):
'''
- Dump the data from the remote estate
+ Dump the data from all the remote estates
'''
for estate in estates:
self.dumpRemoteEstate(estate)
@@ -215,17 +218,25 @@ def loadRemoteEstate(self, estate):
uid = estate.eid
return (self.loadRemoteData(uid))
- def removeRemoteEstate(self, estate):
+ def clearRemoteEstate(self, estate):
'''
Load and Return the data from the remote estate file
Override this in sub class to change uid
'''
uid = estate.eid
- self.removeRemoteData(uid)
+ self.clearRemoteData(uid)
class RoadKeep(Keep):
'''
RAET protocol estate road (channel) data persistence
+
+ keep/
+ stackname/
+ local/
+ estate.eid.ext
+ remote/
+ estate.eid.ext
+ estate.eid.ext
'''
def __init__(self, prefix='estate', **kwa):
'''
@@ -240,6 +251,7 @@ def dumpLocalEstate(self, estate):
data = odict([
('eid', estate.eid),
('name', estate.name),
+ ('main', estate.main),
('host', estate.host),
('port', estate.port),
('sid', estate.sid)
@@ -270,6 +282,18 @@ class SafeKeep(Keep):
def __init__(self, prefix='key', **kwa):
'''
Setup SafeKeep instance
+
+ keep/
+ local/
+ key.eid.ext
+ remote/
+ key.eid.ext
+ key.eid.ext
+
+ pended/
+ key.eid.ext
+ rejected/
+ key.eid.ext
'''
super(SafeKeep, self).__init__(prefix=prefix, **kwa)
@@ -299,7 +323,7 @@ def dumpRemoteEstate(self, estate):
'''
Dump the data from the remote estate
'''
- uid = estate.name
+ uid = estate.eid
data = odict([
('eid', estate.eid),
('name', estate.name),
@@ -323,11 +347,22 @@ def removeRemoteEstate(self, estate):
Override this in sub class to change uid
'''
uid = estate.name
- self.removeRemoteData(uid)
+ self.clearRemoteData(uid)
- def remoteAcceptStatus(self, estate):
+ def statusRemote(self, estate):
'''
Evaluate acceptance status of estate per its keys
persist key data differentially based on status
'''
return (raeting.acceptance.accepted)
+
+def clearAllRoadSafe(dirpath):
+ '''
+ Convenience function to clear all road and safe keep data in dirpath
+ '''
+ road = RoadKeep(dirpath=dirpath)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = SafeKeep(dirpath=dirpath)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
View
58 salt/transport/road/raet/packeting.py
@@ -11,7 +11,11 @@
import simplejson as json
except ImportError:
import json
-import msgpack
+
+try:
+ import msgpack
+except ImportError:
+ mspack = None
# Import ioflo libs
from ioflo.base.odicting import odict
@@ -193,9 +197,11 @@ def pack(self):
self.packed = json.dumps(self.data, separators=(',', ':'))
elif bk == raeting.bodyKinds.msgpack:
if self.data:
+ if not msgpack:
+ emsg = "Msgpack not installed."
+ raise raeting.PacketError(emsg)
self.packed = msgpack.dumps(self.data)
-
- if bk == raeting.bodyKinds.raw:
+ elif bk == raeting.bodyKinds.raw:
self.packed = self.data # data is already formatted string
class RxBody(Body):
@@ -223,10 +229,18 @@ def parse(self):
emsg = "Packet body not a mapping."
raise raeting.PacketError(emsg)
self.data = kit
-
- if bk == raeting.bodyKinds.raw:
+ elif bk == raeting.bodyKinds.msgpack:
+ if self.packed:
+ if not msgpack:
+ emsg = "Msgpack not installed."
+ raise raeting.PacketError(emsg)
+ kit = msgpack.loads(self.packed, object_pairs_hook=odict)
+ if not isinstance(kit, Mapping):
+ emsg = "Packet body not a mapping."
+ raise raeting.PacketError(emsg)
+ self.data = kit
+ elif bk == raeting.bodyKinds.raw:
self.data = self.packed # return as string
-
elif bk == raeting.bodyKinds.nada:
pass
@@ -452,16 +466,16 @@ def index(self):
data = self.data
le = data['se']
if le == 0:
- host = data['sh']
- if host == '0.0.0.0':
- host = '127.0.0.1'
- le = (host, data['sp'])
+ #host = data['sh']
+ #if host == '0.0.0.0':
+ #host = '127.0.0.1'
+ le = (data['sh'], data['sp'])
re = data['de']
if re == 0:
- host = data['dh']
- if host == '0.0.0.0':
- host = '127.0.0.1'
- re = (host, data['dp'])
+ #host = data['dh']
+ #if host == '0.0.0.0':
+ #host = '127.0.0.1'
+ re = (data['dh'], data['dp'])
return ((data['cf'], le, re, data['si'], data['ti'], data['bf']))
def signature(self, msg):
@@ -564,16 +578,16 @@ def index(self):
data = self.data
le = data['de']
if le == 0:
- host = data['dh']
- if host == '0.0.0.0':
- host = '127.0.0.1'
- le = (host, data['dp'])
+ #host = data['dh']
+ #if host == '0.0.0.0':
+ #host = '127.0.0.1'
+ le = (data['dh'], data['dp'])
re = data['se']
if re == 0:
- host = data['sh']
- if host == '0.0.0.0':
- host = '127.0.0.1'
- re = (host, data['sp'])
+ #host = data['sh']
+ #if host == '0.0.0.0':
+ #host = '127.0.0.1'
+ re = (data['sh'], data['sp'])
return ((not data['cf'], le, re, data['si'], data['ti'], data['bf']))
def verify(self, signature, msg):
View
152 salt/transport/road/raet/stacking.py
@@ -42,17 +42,19 @@ class StackUdp(object):
def __init__(self,
name='',
+ main=False,
version=raeting.VERSION,
store=None,
estate=None,
eid=None,
ha=("", raeting.RAET_PORT),
- rxMsgs = None,
- txMsgs = None,
- udpRxes = None,
- udpTxes = None,
- road = None,
- safe = None,
+ rxMsgs=None,
+ txMsgs=None,
+ udpRxes=None,
+ udpTxes=None,
+ road=None,
+ safe=None,
+ dirpath=None,
):
'''
Setup StackUdp instance
@@ -65,22 +67,30 @@ def __init__(self,
self.store = store or storing.Store(stamp=0.0)
self.estates = odict() # remote estates attached to this stack by eid
self.eids = odict() # reverse lookup eid by estate.name
- # local estate for this stack
- self.estate = estate or estating.LocalEstate(stack=self, eid=eid, ha=ha)
self.transactions = odict() #transactions
self.rxMsgs = rxMsgs if rxMsgs is not None else deque() # messages received
self.txMsgs = txMsgs if txMsgs is not None else deque() # messages to transmit
- #(msg, deid) deid=0 is broadcast
self.udpRxes = udpRxes if udpRxes is not None else deque() # udp packets received
self.udpTxes = udpTxes if udpTxes is not None else deque() # udp packet to transmit
- self.road = road or keeping.RoadKeep()
- self.safe = safe or keeping.SafeKeep()
+
+ self.road = road or keeping.RoadKeep(dirpath=dirpath, stackname=self.name)
+ self.safe = safe or keeping.SafeKeep(dirpath=dirpath, stackname=self.name)
+ kept = self.loadLocal() # local estate from saved data
+ # local estate for this stack
+ self.estate = kept or estate or estating.LocalEstate(stack=self,
+ eid=eid,
+ main=main,
+ ha=ha)
+ self.estate.stack = self
self.serverUdp = aiding.SocketUdpNb(ha=self.estate.ha, bufsize=raeting.MAX_MESSAGE_SIZE)
self.serverUdp.reopen() # open socket
self.estate.ha = self.serverUdp.ha # update estate host address after open
+ self.dumpLocal() # save local estate data
- #self.road.dumpLocalEstate(self.estate)
- #self.safe.dumpLocalEstate(self.estate)
+ kepts = self.loadAllRemote() # remote estates from saved data
+ for kept in kepts:
+ self.addRemoteEstate(kept)
+ self.dumpAllRemote() # save remote estate data
def fetchRemoteEstateByHostPort(self, host, port):
'''
@@ -93,6 +103,18 @@ def fetchRemoteEstateByHostPort(self, host, port):
return None
+ def fetchRemoteEstateByKeys(self, sighex, prihex):
+ '''
+ Search for remote estate with matching (name, sighex, prihex)
+ Return estate if found Otherwise return None
+ '''
+ for estate in self.estates.values():
+ if (estate.signer.keyhex == sighex or
+ estate.priver.keyhex == prihex):
+ return estate
+
+ return None
+
def fetchRemoteEstateByName(self, name):
'''
Search for remote estate with matching name
@@ -167,6 +189,108 @@ def removeRemoteEstate(self, eid):
del self.estates[eid]
del self.eids[estate.name]
+ def clearLocal(self):
+ '''
+ Clear local keeps
+ '''
+ self.road.clearLocalData()
+ self.safe.clearLocalData()
+
+ def clearRemote(self, estate):
+ '''
+ Clear remote keeps of estate
+ '''
+ self.road.clearRemoteEstate()
+ self.safe.clearRemoteEstate()
+
+ def clearAllRemote(self):
+ '''
+ Clear all remote keeps
+ '''
+ self.road.clearAllRemoteData()
+ self.safe.clearAllRemoteData()
+
+ def dumpLocal(self):
+ '''
+ Dump keeps of local estate
+ '''
+ self.road.dumpLocalEstate(self.estate)
+ self.safe.dumpLocalEstate(self.estate)
+
+ def dumpRemote(self, estate):
+ '''
+ Dump keeps of estate
+ '''
+ self.road.dumpRemoteEstate(estate)
+ self.safe.dumpRemoteEstate(estate)
+
+ def dumpRemoteByEid(self, eid):
+ '''
+ Dump keeps of estate given by eid
+ '''
+ estate = self.estates.get(eid)
+ if estate:
+ self.dumpRemote(estate)
+
+ def dumpAllRemote(self):
+ '''
+ Dump all remotes estates to keeps'''
+ self.road.dumpAllRemoteEstates(self.estates.values())
+ self.safe.dumpAllRemoteEstates(self.estates.values())
+
+ def loadLocal(self):
+ '''
+ Load and Return local estate if keeps found
+ '''
+ road = self.road.loadLocalData()
+ safe = self.safe.loadLocalData()
+ if not road or not safe:
+ return None
+ estate = estating.LocalEstate(stack=self,
+ eid=road['eid'],
+ name=road['name'],
+ main=road['main'],
+ host=road['host'],
+ port=road['port'],
+ sid=road['sid'],
+ sigkey=safe['sighex'],
+ prikey=safe['prihex'],)
+ return estate
+
+ def loadAllRemote(self):
+ '''
+ Load and Return list of remote estates
+ remote = estating.RemoteEstate( stack=self.stack,
+ name=name,
+ host=data['sh'],
+ port=data['sp'],
+ verkey=verhex,
+ pubkey=pubhex,
+ rsid=self.sid,
+ rtid=self.tid, )
+ self.stack.addRemoteEstate(remote)
+ '''
+ estates = []
+ roads = self.road.loadAllRemoteData()
+ safes = self.safe.loadAllRemoteData()
+ if not roads or not safes:
+ return []
+ for key, road in roads.items():
+ if key not in safes:
+ continue
+ safe = safes[key]
+ estate = estating.RemoteEstate( stack=self,
+ eid=road['eid'],
+ name=road['name'],
+ host=road['host'],
+ port=road['port'],
+ sid=road['sid'],
+ rsid=road['rsid'],
+ verkey=safe['verhex'],
+ pubkey=safe['pubhex'],)
+ estates.append(estate)
+ return estates
+
def addTransaction(self, index, transaction):
'''
Safely add transaction at index If not already there
@@ -258,6 +382,8 @@ def txMsg(self, msg, deid=None):
raise raeting.StackError(emsg)
self.txMsgs.append((msg, deid))
+ transmit = txMsg
+
def serviceTxMsg(self):
'''
Service .udpTxMsgs queue of outgoint udp messages for message transactions
View
141 salt/transport/road/raet/test/test_keeping.py
@@ -9,7 +9,7 @@
from ioflo.base.odicting import odict
-from salt.transport.road.raet import (raeting, nacling, estating, keeping)
+from salt.transport.road.raet import (raeting, nacling, estating, keeping, stacking)
def test():
@@ -25,51 +25,120 @@ def test():
masterPubKeyHex = privateer.pubhex
signer = nacling.Signer()
- minionSignKeyHex = signer.keyhex
- minionVerKeyHex = signer.verhex
+ m1SignKeyHex = signer.keyhex
+ m1VerKeyHex = signer.verhex
privateer = nacling.Privateer()
- minionPriKeyHex = privateer.keyhex
- minionPubKeyHex = privateer.pubhex
+ m1PriKeyHex = privateer.keyhex
+ m1PubKeyHex = privateer.pubhex
- #master stack
- estate = estating.LocalEstate(eid=1,
- sigkey=masterSignKeyHex,
- prikey=masterPriKeyHex)
+ signer = nacling.Signer()
+ m2SignKeyHex = signer.keyhex
+ m2VerKeyHex = signer.verhex
+ privateer = nacling.Privateer()
+ m2PriKeyHex = privateer.keyhex
+ m2PubKeyHex = privateer.pubhex
- remote0 = estating.RemoteEstate(eid=2,
+ signer = nacling.Signer()
+ m3SignKeyHex = signer.keyhex
+ m3VerKeyHex = signer.verhex
+ privateer = nacling.Privateer()
+ m3PriKeyHex = privateer.keyhex
+ m3PubKeyHex = privateer.pubhex
+
+ #master stack
+ dirpath = os.path.join(os.getcwd(), 'keep', 'master')
+ estate = estating.LocalEstate( eid=1,
+ name='master',
+ sigkey=masterSignKeyHex,
+ prikey=masterPriKeyHex,)
+ stack0 = stacking.StackUdp(estate=estate, dirpath=dirpath)
+
+ stack0.addRemoteEstate(estating.RemoteEstate(eid=2,
ha=('127.0.0.1', 7532),
- verkey=minionVerKeyHex,
- pubkey=minionPubKeyHex,)
+ verkey=m1VerKeyHex,
+ pubkey=m1PubKeyHex,))
- remote1 = estating.RemoteEstate(eid=3,
+ stack0.addRemoteEstate(estating.RemoteEstate(eid=3,
ha=('127.0.0.1', 7533),
- verkey=minionVerKeyHex,
- pubkey=minionPubKeyHex,)
+ verkey=m2VerKeyHex,
+ pubkey=m2PubKeyHex,))
- pond = keeping.RoadKeep(dirpath=os.getcwd())
- safe = keeping.SafeKeep(dirpath=os.getcwd())
+ #minion stack
+ dirpath = os.path.join(os.getcwd(), 'keep', 'minion1')
+ estate = estating.LocalEstate( eid=2,
+ name='minion1',
+ ha=("", raeting.RAET_TEST_PORT),
+ sigkey=m1SignKeyHex,
+ prikey=m1PriKeyHex,)
+ stack1 = stacking.StackUdp(estate=estate, dirpath=dirpath)
- pond.dumpLocalEstate(estate)
- pond.dumpRemoteEstate(remote0)
- pond.dumpRemoteEstate(remote1)
-
- safe.dumpLocalEstate(estate)
- safe.dumpRemoteEstate(remote0)
- safe.dumpRemoteEstate(remote1)
-
- data = pond.loadLocalData()
- print data
-
- data = pond.loadAllRemoteData()
- print data
-
- data = safe.loadLocalData()
- print data
-
- data = safe.loadAllRemoteData()
- print data
+ stack1.addRemoteEstate(estating.RemoteEstate(eid=1,
+ ha=('127.0.0.1', 7532),
+ verkey=masterVerKeyHex,
+ pubkey=masterPubKeyHex,))
+
+ stack1.addRemoteEstate(estating.RemoteEstate(eid=4,
+ ha=('127.0.0.1', 7534),
+ verkey=m3VerKeyHex,
+ pubkey=m3PubKeyHex,))
+
+ stack0.clearLocal()
+ stack0.clearAllRemote()
+ stack1.clearLocal()
+ stack1.clearAllRemote()
+
+ stack0.dumpLocal()
+ stack0.dumpAllRemote()
+
+ stack1.dumpLocal()
+ stack1.dumpAllRemote()
+
+ print "Road {0}".format(stack0.name)
+ print stack0.road.loadLocalData()
+ print stack0.road.loadAllRemoteData()
+ print "Safe {0}".format(stack0.name)
+ print stack0.safe.loadLocalData()
+ print stack0.safe.loadAllRemoteData()
+ print
+
+ print "Road {0}".format(stack1.name)
+ print stack1.road.loadLocalData()
+ print stack1.road.loadAllRemoteData()
+ print "Safe {0}".format(stack1.name)
+ print stack1.safe.loadLocalData()
+ print stack1.safe.loadAllRemoteData()
+
+ stack0.serverUdp.close()
+ stack1.serverUdp.close()
+ #master stack
+ dirpath = os.path.join(os.getcwd(), 'keep', 'master')
+ estate = estating.LocalEstate( eid=1,
+ name='master',
+ sigkey=masterSignKeyHex,
+ prikey=masterPriKeyHex,)
+ stack0 = stacking.StackUdp(estate=estate, dirpath=dirpath)
+
+ #minion stack
+ dirpath = os.path.join(os.getcwd(), 'keep', 'minion1')
+ estate = estating.LocalEstate( eid=2,
+ name='minion1',
+ ha=("", raeting.RAET_TEST_PORT),
+ sigkey=m1SignKeyHex,
+ prikey=m1PriKeyHex,)
+ stack1 = stacking.StackUdp(estate=estate, dirpath=dirpath)
+
+
+ estate0 = stack0.loadLocal()
+ print estate0.name, estate0.eid, estate0.sid, estate0.ha, estate0.signer, estate0.priver
+ estate1 = stack1.loadLocal()
+ print estate1.name, estate1.eid, estate1.sid, estate1.ha, estate1.signer, estate1.priver
+
+ stack0.clearLocal()
+ stack0.clearAllRemote()
+ stack1.clearLocal()
+ stack1.clearAllRemote()
if __name__ == "__main__":
View
57 salt/transport/road/raet/test/test_packeting.py
@@ -6,76 +6,96 @@
# pylint: skip-file
# pylint: disable=C0103
+import os
+
from ioflo.base.odicting import odict
-from salt.transport.road.raet import (raeting, nacling, packeting,
+from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, transacting, stacking)
-def test():
+def test( bk = raeting.bodyKinds.json):
'''
Test packeting.
'''
- data = odict(hk=1, bk=raeting.bodyKinds.json)
+ data = odict(hk=1, bk=bk)
body = odict(msg='Hello Raet World', extra='what is this')
packet0 = packeting.TxPacket(embody=body, data=data, )
print packet0.body.data
packet0.pack()
print packet0.packed
+ packet1 = packeting.RxPacket(packed=packet0.packed)
+ packet1.parse()
+ print packet1.data
+ print packet1.body.data
stuff = []
for i in range(300):
stuff.append(str(i).rjust(4, " "))
-
stuff = "".join(stuff)
-
data.update(bk=raeting.bodyKinds.raw)
packet0 = packeting.TxPacket(embody=stuff, data=data, )
packet0.pack()
print packet0.packed
+ packet1 = packeting.RxPacket(packed=packet0.packed)
+ packet1.parse()
+ print packet1.data
+ print packet1.body.data
rejoin = []
if packet0.segmented:
for index, segment in packet0.segments.items():
print index, segment.packed
rejoin.append(segment.body.packed)
-
rejoin = "".join(rejoin)
print stuff == rejoin
+ #master stack
+ masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
masterVerKeyHex = signer.verhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
masterPubKeyHex = privateer.pubhex
+ dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
+ #minion stack
+ minionName = "minion"
signer = nacling.Signer()
minionSignKeyHex = signer.keyhex
minionVerKeyHex = signer.verhex
privateer = nacling.Privateer()
minionPriKeyHex = privateer.keyhex
minionPubKeyHex = privateer.pubhex
+ dirpathMinion = os.path.join(os.getcwd(), 'keep', minionName)
- #master stack
- estate = estating.LocalEstate(eid=1,
- sigkey=masterSignKeyHex,
- prikey=masterPriKeyHex)
- stack0 = stacking.StackUdp(estate=estate)
+ keeping.clearAllRoadSafe(dirpathMaster)
+ keeping.clearAllRoadSafe(dirpathMinion)
- remote1 = estating.RemoteEstate( eid=2,
+ estate = estating.LocalEstate( eid=1,
+ name=masterName,
+ sigkey=masterSignKeyHex,
+ prikey=masterPriKeyHex)
+ stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
+
+ remote1 = estating.RemoteEstate( eid=2,
+ name=minionName,
+ ha=("127.0.0.1", raeting.RAET_TEST_PORT),
verkey=minionVerKeyHex,
pubkey=minionPubKeyHex,)
stack0.addRemoteEstate(remote1)
- #minion stack
+
estate = estating.LocalEstate( eid=2,
+ name=minionName,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
stack1 = stacking.StackUdp(estate=estate)
remote0 = estating.RemoteEstate( eid=1,
+ name=masterName,
ha=('127.0.0.1', raeting.RAET_PORT),
verkey=masterVerKeyHex,
pubkey=masterPubKeyHex,)
@@ -84,10 +104,11 @@ def test():
remote0.publee = nacling.Publican(key=remote1.privee.pubhex)
remote1.publee = nacling.Publican(key=remote0.privee.pubhex)
+ print "\n___________Raw Body Test"
data.update(se=1, de=2, bk=raeting.bodyKinds.raw, fk=raeting.footKinds.nacl)
packet0 = packeting.TxPacket(stack=stack0, embody=stuff, data=data, )
packet0.pack()
- print packet0.packed
+ print packet0.packed #not signed if segmented each segment is signed
rejoin = []
if packet0.segmented:
@@ -121,7 +142,9 @@ def test():
body = odict(stuff=stuff)
print body
- data.update(se=1, de=2, bk=raeting.bodyKinds.json, fk=raeting.footKinds.nacl)
+
+ print "\n_____________ Packed Body Test"
+ data.update(se=1, de=2, bk=bk, fk=raeting.footKinds.nacl)
packet0 = packeting.TxPacket(stack=stack0, embody=body, data=data, )
packet0.pack()
print packet0.packed
@@ -149,6 +172,8 @@ def test():
body = odict(stuff=stuff)
print body
+
+ print "\n___________ Encrypted Coat Test "
data.update(se=1, de=2,
bk=raeting.bodyKinds.json,
ck=raeting.coatKinds.nacl,
@@ -186,5 +211,7 @@ def test():
print segmentage.body.data
print segmentage.body.packed == packet0.body.packed
+
if __name__ == "__main__":
test()
+ test(bk=raeting.bodyKinds.msgpack)
View
307 salt/transport/road/raet/test/test_stackBootstrap.py
@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+'''
+Tests to try out stacking. Potentially ephemeral
+
+'''
+# pylint: skip-file
+
+import os
+
+from ioflo.base.odicting import odict
+from ioflo.base.aiding import Timer
+
+from ioflo.base.consoling import getConsole
+console = getConsole()
+
+from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
+ estating, yarding, transacting, stacking)
+
+
+def test():
+ '''
+ initially
+ master on port 7530 with eid of 1
+ minion on port 7531 with eid of 0
+ eventually
+ master eid of 1
+ minion eid of 2
+ '''
+ console.reinit(verbosity=console.Wordage.concise)
+
+ #master stack
+ masterName = "master"
+ signer = nacling.Signer()
+ masterSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ masterPriKeyHex = privateer.keyhex
+
+ dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
+ road = keeping.RoadKeep(dirpath=dirpathMaster)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMaster)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
+ estate = estating.LocalEstate( eid=1,
+ name=masterName,
+ sigkey=masterSignKeyHex,
+ prikey=masterPriKeyHex,)
+ stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
+
+ #minion0 stack
+ minionName0 = "minion0"
+ signer = nacling.Signer()
+ minionSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ minionPriKeyHex = privateer.keyhex
+
+ dirpathMinion0 = os.path.join(os.getcwd(), 'keep', minionName0)
+ road = keeping.RoadKeep(dirpath=dirpathMinion0)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMinion0)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
+ estate = estating.LocalEstate( eid=0,
+ name=minionName0,
+ ha=("", raeting.RAET_TEST_PORT),
+ sigkey=minionSignKeyHex,
+ prikey=minionPriKeyHex,)
+ stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion0)
+
+ #minion1 stack
+ minionName1 = "minion1"
+ signer = nacling.Signer()
+ minionSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ minionPriKeyHex = privateer.keyhex
+
+ dirpathMinion1 = os.path.join(os.getcwd(), 'keep', minionName1)
+ road = keeping.RoadKeep(dirpath=dirpathMinion1)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMinion1)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
+ estate = estating.LocalEstate( eid=0,
+ name=minionName1,
+ ha=("", 7532),
+ sigkey=minionSignKeyHex,
+ prikey=minionPriKeyHex,)
+ stack2 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion1)
+
+ print "\n********* Join Transaction **********"
+ stack1.join()
+ timer = Timer(duration=2)
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
+ for estate in stack1.estates.values():
+ print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
+
+ print "\n********* Allow Transaction **********"
+ stack1.allow()
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
+ for estate in stack1.estates.values():
+ print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
+
+ print "\n********* Message Transactions Both Ways **********"
+ stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
+ stack1.transmit(odict(house="Oh Boy2", queue="Mean"))
+ stack1.transmit(odict(house="Oh Boy3", queue="Ugly"))
+ stack1.transmit(odict(house="Oh Boy4", queue="Pretty"))
+
+ stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
+ stack0.transmit(odict(house="Yeah Baby2", queue="Bad"))
+ stack0.transmit(odict(house="Yeah Baby3", queue="Fast"))
+ stack0.transmit(odict(house="Yeah Baby4", queue="Slow"))
+
+ #segmented packets
+ stuff = []
+ for i in range(300):
+ stuff.append(str(i).rjust(4, " "))
+ stuff = "".join(stuff)
+
+ stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
+ stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
+
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
+ print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
+ print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
+ print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
+ print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
+ print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
+
+
+ print "Road {0}".format(stack0.name)
+ print stack0.road.loadLocalData()
+ print stack0.road.loadAllRemoteData()
+ print "Safe {0}".format(stack0.name)
+ print stack0.safe.loadLocalData()
+ print stack0.safe.loadAllRemoteData()
+ print
+
+ print "Road {0}".format(stack1.name)
+ print stack1.road.loadLocalData()
+ print stack1.road.loadAllRemoteData()
+ print "Safe {0}".format(stack1.name)
+ print stack1.safe.loadLocalData()
+ print stack1.safe.loadAllRemoteData()
+ print
+
+ stack0.serverUdp.close()
+ stack1.serverUdp.close()
+ stack2.serverUdp.close()
+
+ #estate0 = stack0.loadLocal()
+ #print estate0.name, estate0.eid, estate0.sid, estate0.ha, estate0.signer, estate0.priver
+ #estate1 = stack1.loadLocal()
+ #print estate1.name, estate1.eid, estate1.sid, estate1.ha, estate1.signer, estate1.priver
+
+ #master stack
+ stack0 = stacking.StackUdp(dirpath=dirpathMaster, main=True)
+
+ #minion0 stack
+ stack1 = stacking.StackUdp(dirpath=dirpathMinion0)
+
+ #minion1 stack
+ stack2 = stacking.StackUdp(dirpath=dirpathMinion1)
+
+ print "\n********* Join Transaction **********"
+ stack1.join()
+ stack2.join()
+ timer = Timer(duration=2)
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack2.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "{0} Remote Estate {1} joined= {2}".format(
+ stack0.name, estate.eid, estate.joined)
+ for estate in stack1.estates.values():
+ print "{0} Remote Estate {1} joined= {2}".format(
+ stack1.name, estate.eid, estate.joined)
+ for estate in stack2.estates.values():
+ print "{0} Remote Estate {1} joined= {2}".format(
+ stack2.name, estate.eid, estate.joined)
+
+ print "\n********* Allow Transaction **********"
+ stack1.allow()
+ stack2.allow()
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack2.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "{0} Remote Estate {1} allowed= {2}".format(
+ stack1.name, estate.eid, estate.allowed)
+ for estate in stack1.estates.values():
+ print "{0} Remote Estate {1} allowed= {2}".format(
+ stack1.name, estate.eid, estate.allowed)
+ for estate in stack2.estates.values():
+ print "{0} Remote Estate {1} allowed= {2}".format(
+ stack2.name, estate.eid, estate.allowed)
+
+ print "\n********* Message Transactions Both Ways **********"
+ #console.reinit(verbosity=console.Wordage.verbose)
+
+ stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
+ stack1.transmit(odict(house="Oh Boy2", queue="Mean"))
+ stack1.transmit(odict(house="Oh Boy3", queue="Ugly"))
+ stack1.transmit(odict(house="Oh Boy4", queue="Pretty"))
+
+ stack2.transmit(odict(house="Really 1", queue="blue"))
+ stack2.transmit(odict(house="Really 2", queue="green"))
+ stack2.transmit(odict(house="Really 3", queue="red"))
+ stack2.transmit(odict(house="Really 4", queue="yello"))
+
+ stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
+ stack0.transmit(odict(house="Yeah Baby2", queue="Bad"))
+ stack0.transmit(odict(house="Yeah Baby3", queue="Fast"))
+ stack0.transmit(odict(house="Yeah Baby4", queue="Slow"))
+
+ stack0.transmit(odict(house="Yeah Momma 1", queue="host"), stack2.estate.eid)
+ stack0.transmit(odict(house="Yeah Momma 1", queue="cold"), stack2.estate.eid)
+ stack0.transmit(odict(house="Yeah Momma 1", queue="boiling"), stack2.estate.eid)
+ stack0.transmit(odict(house="Yeah Momma 1", queue="tepid"), stack2.estate.eid)
+
+ # segmented packets
+ stuff = []
+ for i in range(300):
+ stuff.append(str(i).rjust(4, " "))
+ stuff = "".join(stuff)
+
+ stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
+ stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
+ stack2.transmit(odict(house="Lucky duck", queue="medium stuff", stuff=stuff))
+ stack0.transmit(odict(house="Boogle", queue="hight stuff", stuff=stuff), stack2.estate.eid)
+
+ timer.restart(duration=4)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack2.serviceAll()
+ stack0.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+ print "{0} Received Messages".format(stack2.name)
+ for msg in stack2.rxMsgs:
+ print msg
+ print
+
+
+ print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
+ print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
+ print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
+ print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
+ print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
+ print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
+ print "{0} eid={1}".format(stack2.name, stack2.estate.eid)
+ print "{0} estates=\n{1}".format(stack2.name, stack2.estates)
+ print "{0} transactions=\n{1}".format(stack2.name, stack2.transactions)
+
+ stack0.serverUdp.close()
+ stack1.serverUdp.close()
+ stack2.serverUdp.close()
+
+ #stack0.clearLocal()
+ #stack0.clearAllRemote()
+ stack1.clearLocal()
+ stack1.clearAllRemote()
+ stack2.clearLocal()
+ stack2.clearAllRemote()
+
+
+if __name__ == "__main__":
+ test()
+
View
239 ...transport/road/raet/test/test_stacking.py → ...transport/road/raet/test/test_stackUdp.py
@@ -3,18 +3,20 @@
Tests to try out stacking. Potentially ephemeral
'''
+import os
+
# pylint: skip-file
from ioflo.base.odicting import odict
from ioflo.base.aiding import Timer
from ioflo.base.consoling import getConsole
console = getConsole()
-from salt.transport.road.raet import (raeting, nacling, packeting,
+from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
estating, yarding, transacting, stacking)
-def testStackUdp():
+def testStackUdp(bk=raeting.bodyKinds.json):
'''
initially
master on port 7530 with eid of 1
@@ -25,31 +27,50 @@ def testStackUdp():
'''
console.reinit(verbosity=console.Wordage.concise)
+ stacking.StackUdp.Bk = bk #set class body kind for serialization
+
+ #master stack
+ masterName = "master"
signer = nacling.Signer()
masterSignKeyHex = signer.keyhex
privateer = nacling.Privateer()
masterPriKeyHex = privateer.keyhex
- signer = nacling.Signer()
- minionSignKeyHex = signer.keyhex
- privateer = nacling.Privateer()
- minionPriKeyHex = privateer.keyhex
+ dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
+ road = keeping.RoadKeep(dirpath=dirpathMaster)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMaster)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
- #master stack
estate = estating.LocalEstate( eid=1,
- name='master',
+ name=masterName,
sigkey=masterSignKeyHex,
prikey=masterPriKeyHex,)
- stack0 = stacking.StackUdp(estate=estate)
+ stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
#minion stack
+ minionName = "minion1"
+ signer = nacling.Signer()
+ minionSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ minionPriKeyHex = privateer.keyhex
+
+ dirpathMinion = os.path.join(os.getcwd(), 'keep', minionName)
+ road = keeping.RoadKeep(dirpath=dirpathMinion)
+ road.clearLocalData()
+ road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMinion)
+ safe.clearLocalData()
+ safe.clearAllRemoteData()
+
estate = estating.LocalEstate( eid=0,
- name='minion1',
+ name=minionName,
ha=("", raeting.RAET_TEST_PORT),
sigkey=minionSignKeyHex,
prikey=minionPriKeyHex,)
- stack1 = stacking.StackUdp(estate=estate)
-
+ stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion)
print "\n********* Join Transaction **********"
stack1.join()
@@ -288,195 +309,17 @@ def testStackUdp():
for msg in stack1.rxMsgs:
print msg
+ stack0.serverUdp.close()
+ stack1.serverUdp.close()
-def testStackUxd():
- '''
- initially
-
-
- '''
- console.reinit(verbosity=console.Wordage.concise)
-
- #lord stack
- #yard0 = yarding.Yard(name='lord')
- stack0 = stacking.StackUxd()
-
- #serf stack
- #yard1 = yarding.Yard(name='serf', yid=1)
- stack1 = stacking.StackUxd()
-
- stack0.addRemoteYard(stack1.yard)
- stack1.addRemoteYard(stack0.yard)
-
- print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
- print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
- print "{0} names=\n{1}".format(stack0.name, stack0.names)
-
- print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
- print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
- print "{0} names=\n{1}".format(stack1.name, stack1.names)
-
- print "\n********* UXD Message lord to serf serf to lord **********"
- msg = odict(what="This is a message to the serf. Get to Work", extra="Fix the fence.")
- stack0.transmit(msg=msg)
-
- msg = odict(what="This is a message to the lord. Let me be", extra="Go away.")
- stack1.transmit(msg=msg)
-
- timer = Timer(duration=0.5)
- timer.restart()
- while not timer.expired:
- stack0.serviceAll()
- stack1.serviceAll()
-
-
- print "{0} Received Messages".format(stack0.name)
- for msg in stack0.rxMsgs:
- print msg
- print
-
- print "{0} Received Messages".format(stack1.name)
- for msg in stack1.rxMsgs:
- print msg
- print
-
- print "\n********* Multiple Messages Both Ways **********"
-
- stack1.transmit(odict(house="Mama mia1", queue="fix me"), None)
- stack1.transmit(odict(house="Mama mia2", queue="help me"), None)
- stack1.transmit(odict(house="Mama mia3", queue="stop me"), None)
- stack1.transmit(odict(house="Mama mia4", queue="run me"), None)
-
- stack0.transmit(odict(house="Papa pia1", queue="fix me"), None)
- stack0.transmit(odict(house="Papa pia2", queue="help me"), None)
- stack0.transmit(odict(house="Papa pia3", queue="stop me"), None)
- stack0.transmit(odict(house="Papa pia4", queue="run me"), None)
-
- #big packets
- stuff = []
- for i in range(300):
- stuff.append(str(i).rjust(4, " "))
- stuff = "".join(stuff)
-
- stack1.transmit(odict(house="Mama mia1", queue="big stuff", stuff=stuff), None)
- stack0.transmit(odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None)
-
- timer.restart(duration=2)
- while not timer.expired:
- stack1.serviceAll()
- stack0.serviceAll()
-
- print "{0} Received Messages".format(stack0.name)
- for msg in stack0.rxMsgs:
- print msg
- print
-
- print "{0} Received Messages".format(stack1.name)
- for msg in stack1.rxMsgs:
- print msg
- print
-
- src = ('minion', 'serf', None)
- dst = ('master', None, None)
- route = odict(src=src, dst=dst)
- msg = odict(route=route, stuff="Hey buddy what is up?")
- stack0.transmit(msg)
-
- timer.restart(duration=2)
- while not timer.expired:
- stack1.serviceAll()
- stack0.serviceAll()
-
- print "{0} Received Messages".format(stack0.name)
- for msg in stack0.rxMsgs:
- print msg
- print
-
- print "{0} Received Messages".format(stack1.name)
- for msg in stack1.rxMsgs:
- print msg
- print
-
- estate = 'minion1'
- #lord stack yard0
- stack0 = stacking.StackUxd(name='lord', lanename='cherry')
-
- #serf stack yard1
- stack1 = stacking.StackUxd(name='serf', lanename='cherry')
-
- print "Yid", yarding.Yard.Yid
-
- print "\n********* Attempt Auto Accept ************"
- #stack0.addRemoteYard(stack1.yard)
- yard = yarding.Yard( name=stack0.yard.name, prefix='cherry')
- stack1.addRemoteYard(yard)
-
- print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
- print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
- print "{0} names=\n{1}".format(stack0.name, stack0.names)
-
- print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
- print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
- print "{0} names=\n{1}".format(stack1.name, stack1.names)
-
- print "\n********* UXD Message serf to lord **********"
- src = (estate, stack1.yard.name, None)
- dst = (estate, stack0.yard.name, None)
- route = odict(src=src, dst=dst)
- msg = odict(route=route, stuff="Serf to my lord. Feed me!")
- stack1.transmit(msg=msg)
-
- timer = Timer(duration=0.5)
- timer.restart()
- while not timer.expired:
- stack0.serviceAll()
- stack1.serviceAll()
-
-
- print "{0} Received Messages".format(stack0.name)
- for msg in stack0.rxMsgs:
- print msg
- print
-
- print "{0} Received Messages".format(stack1.name)
- for msg in stack1.rxMsgs:
- print msg
- print
-
- print "\n********* UXD Message lord to serf **********"
- src = (estate, stack0.yard.name, None)
- dst = (estate, stack1.yard.name, None)
- route = odict(src=src, dst=dst)
- msg = odict(route=route, stuff="Lord to serf. Feed yourself!")
- stack0.transmit(msg=msg)
-
-
- timer = Timer(duration=0.5)
- timer.restart()
- while not timer.expired:
- stack0.serviceAll()
- stack1.serviceAll()
-
- print "{0} Received Messages".format(stack0.name)
- for msg in stack0.rxMsgs:
- print msg
- print
-
- print "{0} Received Messages".format(stack1.name)
- for msg in stack1.rxMsgs:
- print msg
- print
-
- print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
- print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
- print "{0} names=\n{1}".format(stack0.name, stack0.names)
-
- print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
- print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
- print "{0} names=\n{1}".format(stack1.name, stack1.names)
+ stack0.clearLocal()
+ stack0.clearAllRemote()
+ stack1.clearLocal()
+ stack1.clearAllRemote()
if __name__ == "__main__":
testStackUdp()
- testStackUxd()
+ testStackUdp(bk=raeting.bodyKinds.msgpack)
+
View
158 salt/transport/road/raet/test/test_stackUdpBootstrap1.py
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+'''
+Tests to try out stacking. Potentially ephemeral
+
+'''
+# pylint: skip-file
+
+import os
+
+from ioflo.base.odicting import odict
+from ioflo.base.aiding import Timer
+
+from ioflo.base.consoling import getConsole
+console = getConsole()
+
+from salt.transport.road.raet import (raeting, nacling, packeting, keeping,
+ estating, yarding, transacting, stacking)
+
+
+def test():
+ '''
+ initially
+ master on port 7530 with eid of 1
+ minion on port 7531 with eid of 0
+ eventually
+ master eid of 1
+ minion eid of 2
+ '''
+ console.reinit(verbosity=console.Wordage.concise)
+
+ #master stack
+ masterName = "master"
+ signer = nacling.Signer()
+ masterSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ masterPriKeyHex = privateer.keyhex
+
+ dirpathMaster = os.path.join(os.getcwd(), 'keep', masterName)
+ road = keeping.RoadKeep(dirpath=dirpathMaster)
+ #road.clearLocalData()
+ #road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMaster)
+ #safe.clearLocalData()
+ #safe.clearAllRemoteData()
+ estate = estating.LocalEstate( eid=1,
+ name=masterName,
+ sigkey=masterSignKeyHex,
+ prikey=masterPriKeyHex,)
+ stack0 = stacking.StackUdp(estate=estate, main=True, dirpath=dirpathMaster)
+
+ #minion0 stack
+ minionName0 = "minion0"
+ signer = nacling.Signer()
+ minionSignKeyHex = signer.keyhex
+ privateer = nacling.Privateer()
+ minionPriKeyHex = privateer.keyhex
+
+ dirpathMinion0 = os.path.join(os.getcwd(), 'keep', minionName0)
+ road = keeping.RoadKeep(dirpath=dirpathMinion0)
+ #road.clearLocalData()
+ #road.clearAllRemoteData()
+ safe = keeping.SafeKeep(dirpath=dirpathMinion0)
+ #safe.clearLocalData()
+ #safe.clearAllRemoteData()
+ estate = estating.LocalEstate( eid=0,
+ name=minionName0,
+ ha=("", raeting.RAET_TEST_PORT),
+ sigkey=minionSignKeyHex,
+ prikey=minionPriKeyHex,)
+ stack1 = stacking.StackUdp(estate=estate, dirpath=dirpathMinion0)
+
+
+ print "\n********* Join Transaction **********"
+ stack1.join()
+ timer = Timer(duration=2)
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
+ for estate in stack1.estates.values():
+ print "Remote Estate {0} joined= {1}".format(estate.eid, estate.joined)
+
+ print "\n********* Allow Transaction **********"
+ stack1.allow()
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+ for estate in stack0.estates.values():
+ print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
+ for estate in stack1.estates.values():
+ print "Remote Estate {0} allowed= {1}".format(estate.eid, estate.allowed)
+
+ print "\n********* Message Transactions Both Ways **********"
+ stack1.transmit(odict(house="Oh Boy1", queue="Nice"))
+ stack0.transmit(odict(house="Yeah Baby1", queue="Good"))
+
+ #segmented packets
+ stuff = []
+ for i in range(300):
+ stuff.append(str(i).rjust(4, " "))
+ stuff = "".join(stuff)
+
+ stack1.transmit(odict(house="Snake eyes", queue="near stuff", stuff=stuff))
+ stack0.transmit(odict(house="Craps", queue="far stuff", stuff=stuff))
+
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ print "{0} eid={1}".format(stack0.name, stack0.estate.eid)
+ print "{0} estates=\n{1}".format(stack0.name, stack0.estates)
+ print "{0} transactions=\n{1}".format(stack0.name, stack0.transactions)
+ print "{0} eid={1}".format(stack1.name, stack1.estate.eid)
+ print "{0} estates=\n{1}".format(stack1.name, stack1.estates)
+ print "{0} transactions=\n{1}".format(stack1.name, stack1.transactions)
+
+
+ print "Road {0}".format(stack0.name)
+ print stack0.road.loadLocalData()
+ print stack0.road.loadAllRemoteData()
+ print "Safe {0}".format(stack0.name)
+ print stack0.safe.loadLocalData()
+ print stack0.safe.loadAllRemoteData()
+ print
+
+ print "Road {0}".format(stack1.name)
+ print stack1.road.loadLocalData()
+ print stack1.road.loadAllRemoteData()
+ print "Safe {0}".format(stack1.name)
+ print stack1.safe.loadLocalData()
+ print stack1.safe.loadAllRemoteData()
+ print
+
+ stack0.serverUdp.close()
+ stack1.serverUdp.close()
+
+ #stack0.clearLocal()
+ #stack0.clearAllRemote()
+ #stack1.clearLocal()
+ #stack1.clearAllRemote()
+
+
+if __name__ == "__main__":
+ test()
+
View
206 salt/transport/road/raet/test/test_stackUxd.py
@@ -0,0 +1,206 @@
+# -*- coding: utf-8 -*-
+'''
+Tests to try out stacking. Potentially ephemeral
+
+'''
+# pylint: skip-file
+from ioflo.base.odicting import odict
+from ioflo.base.aiding import Timer
+
+from ioflo.base.consoling import getConsole
+console = getConsole()
+
+from salt.transport.road.raet import (raeting, nacling, packeting,
+ estating, yarding, transacting, stacking)
+
+
+def testStackUxd():
+ '''
+ initially
+
+
+ '''
+ console.reinit(verbosity=console.Wordage.concise)
+
+ #lord stack
+ #yard0 = yarding.Yard(name='lord')
+ stack0 = stacking.StackUxd()
+
+ #serf stack
+ #yard1 = yarding.Yard(name='serf', yid=1)
+ stack1 = stacking.StackUxd()
+
+ stack0.addRemoteYard(stack1.yard)
+ stack1.addRemoteYard(stack0.yard)
+
+ print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
+ print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
+ print "{0} names=\n{1}".format(stack0.name, stack0.names)
+
+ print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
+ print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
+ print "{0} names=\n{1}".format(stack1.name, stack1.names)
+
+ print "\n********* UXD Message lord to serf serf to lord **********"
+ msg = odict(what="This is a message to the serf. Get to Work", extra="Fix the fence.")
+ stack0.transmit(msg=msg)
+
+ msg = odict(what="This is a message to the lord. Let me be", extra="Go away.")
+ stack1.transmit(msg=msg)
+
+ timer = Timer(duration=0.5)
+ timer.restart()
+ while not timer.expired:
+ stack0.serviceAll()
+ stack1.serviceAll()
+
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ print "\n********* Multiple Messages Both Ways **********"
+
+ stack1.transmit(odict(house="Mama mia1", queue="fix me"), None)
+ stack1.transmit(odict(house="Mama mia2", queue="help me"), None)
+ stack1.transmit(odict(house="Mama mia3", queue="stop me"), None)
+ stack1.transmit(odict(house="Mama mia4", queue="run me"), None)
+
+ stack0.transmit(odict(house="Papa pia1", queue="fix me"), None)
+ stack0.transmit(odict(house="Papa pia2", queue="help me"), None)
+ stack0.transmit(odict(house="Papa pia3", queue="stop me"), None)
+ stack0.transmit(odict(house="Papa pia4", queue="run me"), None)
+
+ #big packets
+ stuff = []
+ for i in range(300):
+ stuff.append(str(i).rjust(4, " "))
+ stuff = "".join(stuff)
+
+ stack1.transmit(odict(house="Mama mia1", queue="big stuff", stuff=stuff), None)
+ stack0.transmit(odict(house="Papa pia4", queue="gig stuff", stuff=stuff), None)
+
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ src = ('minion', 'serf', None)
+ dst = ('master', None, None)
+ route = odict(src=src, dst=dst)
+ msg = odict(route=route, stuff="Hey buddy what is up?")
+ stack0.transmit(msg)
+
+ timer.restart(duration=2)
+ while not timer.expired:
+ stack1.serviceAll()
+ stack0.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ estate = 'minion1'
+ #lord stack yard0
+ stack0 = stacking.StackUxd(name='lord', lanename='cherry')
+
+ #serf stack yard1
+ stack1 = stacking.StackUxd(name='serf', lanename='cherry')
+
+ print "Yid", yarding.Yard.Yid
+
+ print "\n********* Attempt Auto Accept ************"
+ #stack0.addRemoteYard(stack1.yard)
+ yard = yarding.Yard( name=stack0.yard.name, prefix='cherry')
+ stack1.addRemoteYard(yard)
+
+ print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
+ print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
+ print "{0} names=\n{1}".format(stack0.name, stack0.names)
+
+ print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
+ print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
+ print "{0} names=\n{1}".format(stack1.name, stack1.names)
+
+ print "\n********* UXD Message serf to lord **********"
+ src = (estate, stack1.yard.name, None)
+ dst = (estate, stack0.yard.name, None)
+ route = odict(src=src, dst=dst)
+ msg = odict(route=route, stuff="Serf to my lord. Feed me!")
+ stack1.transmit(msg=msg)
+
+ timer = Timer(duration=0.5)
+ timer.restart()
+ while not timer.expired:
+ stack0.serviceAll()
+ stack1.serviceAll()
+
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ print "\n********* UXD Message lord to serf **********"
+ src = (estate, stack0.yard.name, None)
+ dst = (estate, stack1.yard.name, None)
+ route = odict(src=src, dst=dst)
+ msg = odict(route=route, stuff="Lord to serf. Feed yourself!")
+ stack0.transmit(msg=msg)
+
+
+ timer = Timer(duration=0.5)
+ timer.restart()
+ while not timer.expired:
+ stack0.serviceAll()
+ stack1.serviceAll()
+
+ print "{0} Received Messages".format(stack0.name)
+ for msg in stack0.rxMsgs:
+ print msg
+ print
+
+ print "{0} Received Messages".format(stack1.name)
+ for msg in stack1.rxMsgs:
+ print msg
+ print
+
+ print "{0} yard name={1} ha={2}".format(stack0.name, stack0.yard.name, stack0.yard.ha)
+ print "{0} yards=\n{1}".format(stack0.name, stack0.yards)
+ print "{0} names=\n{1}".format(stack0.name, stack0.names)
+
+ print "{0} yard name={1} ha={2}".format(stack1.name, stack1.yard.name, stack1.yard.ha)
+ print "{0} yards=\n{1}".format(stack1.name, stack1.yards)
+ print "{0} names=\n{1}".format(stack1.name, stack1.names)
+
+
+
+if __name__ == "__main__":
+ testStackUxd()
View
175 salt/transport/road/raet/transacting.py
@@ -71,18 +71,10 @@ def index(self):
'''
le = self.stack.estate.eid
if le == 0: #bootstapping onto channel use ha
- host = self.stack.estate.host
- if host == '0.0.0.0':
- host = '127.0.0.1'
- le = (host, self.stack.estate.port)
- #le = self.stack.estate.ha
+ le = self.stack.estate.ha
re = self.reid
- if re == 0:
- host = self.stack.estates[self.reid].host
- if host == '0.0.0.0':
- host = '127.0.0.1'
- re = (host, self.stack.estates[self.reid].port)
- #re = self.stack.estates[self.reid].ha
+ if re == 0: #bootstapping onto channel use ha
+ re = self.stack.estates[self.reid].ha
return ((self.rmt, le, re, self.sid, self.tid, self.bcst,))
def process(self):
@@ -93,7 +85,7 @@ def process(self):
def receive(self, packet):
'''
- Process received packet
+ Process received packet Subclasses should super call this
'''
self.rxPacket = packet
@@ -197,6 +189,14 @@ def receive(self, packet):
elif packet.data['pk'] == raeting.pcktKinds.response:
self.accept()
+ def process(self):
+ '''
+ Perform time based processing of transaction
+ '''
+ # need keep sending join until accepted or timed out
+ #self.join()
+
+
def prep(self):
'''
Prepare .txData
@@ -283,19 +283,56 @@ def accept(self):
emsg = "Missing remote crypt key in accept packet"
raise raeting.TransactionError(emsg)
- index = self.index # save before we change it
+ #index = self.index # save before we change it
self.stack.estate.eid = leid
+ self.stack.dumpLocal()
+
remote = self.stack.estates[self.reid]
- remote.verfer = nacling.Verifier(key=verhex)
- remote.pubber = nacling.Publican(key=pubhex)
+ if remote.verfer.keyhex != verhex:
+ remote.verfer = nacling.Verifier(key=verhex)
+ if remote.pubber.keyhex != pubhex:
+ remote.pubber = nacling.Publican(key=pubhex)
+
if remote.eid != reid: #move remote estate to new index
self.stack.moveRemoteEstate(old=remote.eid, new=reid)
if remote.name != name: # rename remote estate to new name
self.stack.renameRemoteEstate(old=remote.name, new=name)
+ self.reid = reid
+
+ # we are assuming for now that the joiner cannot talk peer to peer only
+ # to main estate otherwise we need to ensure unique eid, name, and ha on road
+
+ # Need to verify if remote keys are accepted here
+
remote.joined = True #accepted
remote.nextSid()
- self.remove(index)
+ self.stack.dumpRemote(remote)
+ self.ackAccept() #need to ack before we remove as index as changed
+
+ def ackAccept(self):
+ '''
+ Send ack to accept response
+ '''
+ if self.reid not in self.stack.estates:
+ emsg = "Invalid remote destination estate id '{0}'".format(self.reid)
+ raise raeting.TransactionError(emsg)
+
+ body = odict()
+ packet = packeting.TxPacket(stack=self.stack,
+ kind=raeting.pcktKinds.ack,
+ embody=body,
+ data=self.txData)
+ try:
+ packet.pack()
+ except raeting.PacketError as ex:
+ print ex
+ self.remove(self.rxPacket.index)
+ return
+
+ self.transmit(packet)
+ self.remove(self.rxPacket.index) # since index changed
+
class Joinent(Correspondent):
'''
@@ -311,14 +348,26 @@ def __init__(self, **kwa):
# Since corresponding bootstrap transaction use packet.index not self.index
self.add(self.rxPacket.index)
+ def receive(self, packet):
+ """
+ Process received packet belonging to this transaction
+ """
+ super(Joinent, self).receive(packet) # self.rxPacket = packet
+
+ if packet.data['tk'] == raeting.trnsKinds.join:
+ if packet.data['pk'] == raeting.pcktKinds.ack: #pended
+ self.joined()
+
def process(self):
'''
Perform time based processing of transaction
'''
- # need to perform the check for accepted status somewhere
+ # need to perform the check for accepted status and then send accept
self.accept()
+ # need to retry accept packet until get ackAccept transaction ends
+
def prep(self):
'''
Prepare .txData
@@ -340,6 +389,32 @@ def join(self):
'''
Process join packet
Perform pend operation of pending remote estate being accepted onto channel
+
+ Apply the rules to ensure no colliding estates on (host, port)
+ If matching name estate found then return
+ Rules:
+ Only one estate with given eid is allowed on road
+ Only one estate with given name is allowed on road.
+ Only one estate with given ha on road is allowed on road.
+
+ Are multiple estates with same keys but different name (ha) allowed?
+ Current logic ignores same keys or not
+
+ Since creating new estate assigns unique eid,
+ we are looking for preexisting estates with any eid.
+
+ Processing steps:
+ I) Search remote estates for matching name
+ A) Found remote
+ 1) HA not match
+ Search remotes for other matching HA but different name
+ If found other delete
+ Reuse found remote to be updated and joined
+
+ B) Not found
+ Search remotes for other matching HA
+ If found delete for now
+ Create new remote and update
'''
if not self.stack.parseInner(self.rxPacket):
return
@@ -361,31 +436,37 @@ def join(self):
emsg = "Missing remote crypt key in join packet"
raise raeting.TransactionError(emsg)
- remote = self.stack.fetchRemoteEstateByHostPort(host=data['sh'], port=data['sp'])
+ host = data['sh']
+ port = data['sp']
+ remote = self.stack.fetchRemoteEstateByName(name)
if remote:
- if (name != remote.name or
- verhex != remote.verfer.keyhex or
- pubhex != remote.pubber.keyhex):
- # nack join request another estate at same address but different credentials
- # and return
- pass
- # accept here and return
-
- if name in self.stack.eids:
- emsg = "Another estate with name already exists"
- raise raeting.TransactionError(emsg)
-
-
-
- remote = estating.RemoteEstate( stack=self.stack,
- name=name,
- host=data['sh'],
- port=data['sp'],
- verkey=verhex,
- pubkey=pubhex,
- rsid=self.sid,
- rtid=self.tid, )
- self.stack.addRemoteEstate(remote) #provisionally add .accepted is None
+ if not (host == remote.host and port == remote.port):
+ other = self.stack.fetchRemoteEstateByHostPort(host, port)
+ if other and other is not remote: #may need to terminate transactions
+ self.stack.removeRemoteEstate(other.eid)
+ remote.host = host
+ remote.port = port
+ if remote.verfer.keyhex != verhex:
+ remote.verfer = nacling.Verifier(verhex)
+ if remote.pubber.keyhex != pubhex:
+ remote.pubber = nacling.Publican(pubhex)
+ remote.rsid = self.sid
+ remote.rtid = self.tid
+ else:
+ other = self.stack.fetchRemoteEstateByHostPort(host, port)
+ if other: #may need to terminate transactions
+ self.stack.removeRemoteEstate(other.eid)
+ remote = estating.RemoteEstate( stack=self.stack,
+ name=name,
+ host=host,
+ port=port,
+ verkey=verhex,
+ pubkey=pubhex,
+ rsid=self.sid,
+ rtid=self.tid, )
+ self.stack.addRemoteEstate(remote) #provisionally add .accepted is None
+
+ self.stack.dumpRemote(remote)
self.reid = remote.eid # auto generated at instance creation above
self.ackJoin()
@@ -398,7 +479,7 @@ def ackJoin(self):
emsg = "Invalid remote destination estate id '{0}'".format(self.reid)
raise raeting.TransactionError(emsg)
- #since bootstrap transaction use updated self.rid
+ #since bootstrap transaction use updated self.reid changed in self.join()
self.txData.update( dh=self.stack.estates[self.reid].host,
dp=self.stack.estates[self.reid].port,)
body = odict()
@@ -440,9 +521,17 @@ def accept(self):
print ex
self.remove(self.rxPacket.index)
return
+
+ self.transmit(packet)
+
+ def joined(self):
+ '''
+ process ack to accept response
+ '''
+ remote = self.stack.estates[self.reid]
remote.joined = True # accepted
remote.nextSid()
- self.transmit(packet)
+ self.stack.dumpRemote(remote)
self.remove(self.rxPacket.index)
class Allower(Initiator):

0 comments on commit 589eba5

Please sign in to comment.