Skip to content
Browse files

allow lantorrent to write to many files on any given node.

still need to make daemon use this feature
  • Loading branch information...
1 parent b7cdc9b commit f434b5c06190c4e35cbb6c492bc4dc889f194336 BuzzTroll committed Sep 7, 2010
View
4 control/bin/ltclient.sh
@@ -10,8 +10,4 @@ ltcs=$8
ssh "$@"
rc=$?
-
-if [ $rc -eq 0 ]; then
- md5sum $localpath
-fi
exit $rc
View
4 lantorrent/pylantorrent/__init__.py
@@ -24,12 +24,12 @@ def log(level, msg, tb=None):
logging.log(level, "===========")
logging.log(level, sys.exc_info()[0])
-def create_endpoint_entry(host, dest_file, data_size, port=2893, block_size=128*1024, degree=1, rid=None):
+def create_endpoint_entry(host, dest_files, data_size, port=2893, block_size=128*1024, degree=1, rid=None):
if rid == None:
rid = str(uuid.uuid1())
final = {}
- final['file'] = dest_file
+ final['files'] = dest_files
final['host'] = host
final['port'] = port
final['block_size'] = block_size
View
7 lantorrent/pylantorrent/client.py
@@ -93,15 +93,16 @@ def main(argv=sys.argv[1:]):
filename = "/" + a[1].strip()
rid = str(uuid.uuid1())
- json_dest = pylantorrent.create_endpoint_entry(host, filename, data_size, port, block_size, degree, rid)
+ filenames = [filename,]
+ json_dest = pylantorrent.create_endpoint_entry(host, filenames, data_size, port, block_size, degree, rid)
dests.append(json_dest)
l = sys.stdin.readline()
cnt = cnt + 1
# for the sake of code resuse this will just be piped into an
# lt daemon processor. /dev/null is used to supress a local write
- final = pylantorrent.create_endpoint_entry("localhost", "/dev/null")
+ final = pylantorrent.create_endpoint_entry("localhost", ["/dev/null",])
final['destinations'] = dests
c = LTClient(argv[0], final)
@@ -117,7 +118,7 @@ def main(argv=sys.argv[1:]):
e['message'] = "Unknown error. Please retry"
else:
e = e['emsg']
- print "ERROR: %s:%s%s %s" % (e['host'], e['port'], e['file'], e['message'])
+ print "ERROR: %s:%s%s %s" % (e['host'], e['port'], str(e['files']), e['message'])
print "Succesfully sent to %d" % (c.success_count)
return 0
View
4 lantorrent/pylantorrent/daemon.py
@@ -43,7 +43,7 @@ def do_it_live(con, rows):
json_dest = {}
json_dest['host'] = r[0]
json_dest['port'] = int(r[1])
- json_dest['file'] = dst_filename
+ json_dest['files'] = [dst_filename]
json_dest['id'] = r[4]
json_dest['block_size'] = 128*1024
json_dest['degree'] = 1
@@ -53,7 +53,7 @@ def do_it_live(con, rows):
final = {}
# for the sake of code resuse this will just be piped into an
# lt daemon processor. /dev/null is used to supress a local write
- final['file'] = "/dev/null"
+ final['files'] = ["/dev/null"]
final['host'] = "localhost"
final['port'] = 2893
final['block_size'] = 131072
View
8 lantorrent/pylantorrent/ltConnection.py
@@ -23,7 +23,7 @@ def __init__(self, json_ent, output_printer):
try:
self.host = json_ent['host']
self.port = int(json_ent['port'])
- self.file = json_ent['file']
+ self.files = json_ent['files']
self.rid = json_ent['id']
self.block_size = int(json_ent['block_size'])
self.degree = int(json_ent['degree'])
@@ -38,7 +38,7 @@ def __init__(self, json_ent, output_printer):
s.connect((self.host, self.port))
self.socket = s
except Exception, ex:
- vex = LTException(505, "%s:%d" % (self.host, self.port), self.host, self.port, self.file, self.rid)
+ vex = LTException(505, "%s:%d" % (self.host, self.port), self.host, self.port, self.files, self.rid)
pylantorrent.log(logging.ERROR, str(vex), traceback)
raise vex
@@ -56,7 +56,7 @@ def send_header(self, destinations):
return
header = {}
- header['file'] = self.file
+ header['files'] = self.files
header['host'] = self.host
header['port'] = self.port
header['id'] = self.rid
@@ -79,7 +79,7 @@ def send(self, data):
self.socket.send(data)
except Exception, ex:
self.valid = False
- self.ex = LTException(506, "%s:%d %s" % (self.host, self.port, str(ex)), self.host, self.port, self.file, self.rid)
+ self.ex = LTException(506, "%s:%d %s" % (self.host, self.port, str(ex)), self.host, self.port, self.files, self.rid)
pylantorrent.log(logging.WARNING, "send error " + str(self.ex), traceback)
j = self.ex.get_json()
s = json.dumps(j)
View
8 lantorrent/pylantorrent/ltException.py
@@ -19,17 +19,17 @@ class LTException(Exception):
errorsCode[509] = "completion status never recieved %s"
errorsCode[510] = "Incorrect checksum %s"
- def __init__(self, code, msg, host=None, port=None, filename=None, rid=None, md5sum=""):
+ def __init__(self, code, msg, host=None, port=None, filenames=None, rid=None, md5sum=""):
self.code = code
self.host = host
self.port = port
- self.filename = filename
+ self.filenames = filenames
self.rid = rid
self.msg = LTException.errorsCode[code] % (msg)
self.md5sum = md5sum
def __str__(self):
- return "%s %d %s:%s%s %s\r\n" % (self.rid, self.code, str(self.host), str(self.port), self.filename, self.msg)
+ return "%s %d %s:%s%s %s\r\n" % (self.rid, self.code, str(self.host), str(self.port), str(self.filenames), self.msg)
#
# results json
@@ -52,7 +52,7 @@ def get_json(self):
header['code'] = self.code
header['host'] = self.host
header['port'] = self.port
- header['file'] = self.filename
+ header['files'] = self.filenames
header['id'] = self.rid
header['message'] = self.msg
header['md5sum'] = self.md5sum
View
30 lantorrent/pylantorrent/server.py
@@ -71,7 +71,7 @@ def read_header(self):
# verify the header
try:
- file_name = self.json_header['file']
+ file_names = self.json_header['files']
host = self.json_header['host']
port = int(self.json_header['port'])
id = self.json_header['id']
@@ -90,7 +90,7 @@ def create_dest_table(self, destinations):
rid = d['id']
host = d['host']
port = d['port']
- filename = d['file']
+ filenames = d['files']
except Exception, ex:
raise LTException(504, str(ex))
dests[rid] = d
@@ -136,11 +136,16 @@ def store_and_forward(self):
header = self.json_header
ex_array = []
- try:
- filename = header['file']
- f = open(filename, "w")
- except Exception, ex:
- raise LTException(503, str(ex), header['host'], int(header['port']), header['file'], header['id'])
+ filenames_a = header['files']
+
+ files_a = []
+ for filename in filenames_a:
+ try:
+ f = open(filename, "w")
+ files_a.append(f)
+ except Exception, ex:
+ pylantorrent.log(logging.ERROR, "Failed to open %s" % (filename))
+ raise LTException(503, str(ex), header['host'], int(header['port']), filenames_a, header['id'])
destinations = header['destinations']
v_con_array = self.get_valid_vcons(destinations)
@@ -158,20 +163,23 @@ def store_and_forward(self):
md5er.update(data)
for v_con in v_con_array:
v_con.send(data)
- f.write(data)
+ for f in files_a:
+ f.write(data)
read_count = read_count + len(data)
md5str = str(md5er.hexdigest()).strip()
except Exception, ex:
for v_con in v_con_array:
v_con.close()
- f.close()
+ for f in files_a:
+ f.close()
raise ex
- f.close()
+ for f in files_a:
+ f.close()
pylantorrent.log(logging.DEBUG, "All data sent %s" % (md5str))
# if we got to here it was successfully written to a file
# and we can call it success
- vex = LTException(0, filename, header['host'], int(header['port']), header['file'], header['id'], md5sum=md5str)
+ vex = LTException(0, filename, header['host'], int(header['port']), header['files'], header['id'], md5sum=md5str)
j = vex.get_json()
s = json.dumps(j)
self.print_results(s)
View
2 lantorrent/tests/many_xfer_test.py
@@ -32,7 +32,7 @@ def _t_new_dest(self, host, port, sz=128*1024):
(osf, fname) = tempfile.mkstemp()
self.files.append(fname)
- ent = pylantorrent.create_endpoint_entry(host, fname, self.src_size, port=int(port), block_size=sz)
+ ent = pylantorrent.create_endpoint_entry(host, [fname], self.src_size, port=int(port), block_size=sz)
os.close(osf)
return (fname, ent)
View
4 lantorrent/tests/simple_test.py
@@ -26,7 +26,7 @@ def tearDown(self):
pass
def test_xfer_one_null_no_check(self):
- final = pylantorrent.create_endpoint_entry(self.host, "/dev/null", self.src_size)
+ final = pylantorrent.create_endpoint_entry(self.host, ["/dev/null"], self.src_size)
final['destinations'] = []
c = LTClient(self.src_file, final)
v = LTServer(c, c)
@@ -36,7 +36,7 @@ def _t_xfer_one_file_check(self, sz=128*1024):
(osf, fname) = tempfile.mkstemp()
try:
- final = pylantorrent.create_endpoint_entry(self.host, fname, self.src_size, block_size=sz)
+ final = pylantorrent.create_endpoint_entry(self.host, [fname], self.src_size, block_size=sz)
final['destinations'] = []
c = LTClient(self.src_file, final)
v = LTServer(c, c)
View
4 lantorrent/tests/xfer_test.py
@@ -26,7 +26,7 @@ def tearDown(self):
pass
def test_xfer_one_null_no_check(self):
- final = pylantorrent.create_endpoint_entry(self.host, "/dev/null", self.src_size)
+ final = pylantorrent.create_endpoint_entry(self.host, ["/dev/null"], self.src_size)
final['destinations'] = []
c = LTClient(self.src_file, final)
v = LTServer(c, c)
@@ -36,7 +36,7 @@ def _t_xfer_one_file_check(self, sz=128*1024):
(osf, fname) = tempfile.mkstemp()
try:
- final = pylantorrent.create_endpoint_entry(self.host, fname, self.src_size, block_size=sz)
+ final = pylantorrent.create_endpoint_entry(self.host, [fname], self.src_size, block_size=sz)
final['destinations'] = []
c = LTClient(self.src_file, final)
v = LTServer(c, c)

0 comments on commit f434b5c

Please sign in to comment.
Something went wrong with that request. Please try again.