Skip to content
Browse files

refactor to preserve sanity.

  • Loading branch information...
1 parent 0ff2d63 commit ee53c597ffacfd95928187a98136fc43e0e13b46 BuzzTroll committed Nov 18, 2010
View
142 control/src/python/workspacecontrol/defaults/imageprocurement/propagate_lantorrent.py
@@ -1,142 +0,0 @@
-from commands import getstatusoutput
-import os
-import string
-from propagate_adapter import PropagationAdapter
-from workspacecontrol.api.exceptions import *
-import propagate_scp
-import urlparse
-import workspacecontrol.main.wc_args as wc_args
-import socket
-
-class LantorrentPropadapter(propagate_scp.propadapter):
-
- def __init__(self, params, common):
- propagate_scp.propadapter.__init__(self, params, common)
- self.ssh = None
- self.ltport = None
- self.ltip = None
- self.scheme = "lantorrent://"
-
- def validate(self):
- # validate scp adaptor
- propagate_scp.propadapter.validate(self)
- self.c.log.debug("validating lantorrent propagation adapter")
-
- self.ltip = self.p.get_conf_or_none("propagation", "lantorrentip")
- if not self.ltip:
- self.ltip = ""
-
- self.ltport = self.p.get_conf_or_none("propagation", "lantorrentport")
- self.ltport = int(self.ltport)
- if not self.ltport:
- self.ltport = 2893
-
- self.ssh = self.p.get_conf_or_none("propagation", "lantorrentexe")
- if not self.ssh:
- raise InvalidConfig("no path to ssh")
-
- if os.path.isabs(self.scp):
- if not os.access(self.scp, os.F_OK):
- raise InvalidConfig("SSH is configured with an absolute path, but it does not seem to exist: '%s'" % self.ssh)
-
- if not os.access(self.scp, os.X_OK):
- raise InvalidConfig("SSH is configured with an absolute path, but it does not seem executable: '%s'" % self.ssh)
-
- self.c.log.debug("SSH configured: %s" % self.ssh)
-
- self.sshuser = self.p.get_conf_or_none("propagation", "ssh_user")
- if self.sshuser:
- self.c.log.debug("SSH default user: %s" % self.sshuser)
- else:
- self.c.log.debug("no SSH default user")
-
- def translate_to_scp(self, imagestr):
- if imagestr[:len(self.scheme)] != self.scheme:
- raise InvalidInput("scp trans invalid lantorrent url, not %s %s" % (self.scheme, imagestr))
- url = "scp://" + imagestr[len(self.scheme):]
- url_a = url.split("?")
- return url_a[0]
-
-
- def validate_unpropagate_target(self, imagestr):
- imagestr = self.translate_to_scp(imagestr)
- propagate_scp.propadapter.validate_unpropagate_target(self, imagestr)
-
- def unpropagate(self, local_absolute_source, remote_target):
- remote_target = self.translate_to_scp(remote_target)
- propagate_scp.propadapter.unpropagate(self, local_absolute_source, remote_target)
-
- def validate_propagate_source(self, imagestr):
- # will throw errors if invalid
- self._lt_command("fake", imagestr)
-
- def propagate(self, remote_source, local_absolute_target):
- self.c.log.info("lantorrent propagation - remote source: %s" % remote_source)
- self.c.log.info("lantorrent propagation - local target: %s" % local_absolute_target)
-
- cmd = self._lt_command(local_absolute_target, remote_source)
- self.c.log.info("Running lantorrent command: %s" % cmd)
-
- ret,output = getstatusoutput(cmd)
- if ret:
- errmsg = "problem running command: '%s' ::: return code" % cmd
- errmsg += ": %d ::: output:\n%s" % (ret, output)
- self.c.log.error(errmsg)
- raise UnexpectedError(errmsg)
- else:
- self.c.log.info("Successfully ran %s. output %s" % (cmd, output))
-
- self.c.log.info("Transfer complete.")
-
- def _lt_command(self, local, remote):
- """
- Remote url: lantorrent://hostname:port/path.
- """
-
- if remote[:len(self.scheme)] != self.scheme:
- raise InvalidInput("get command invalid lantorrent url, not %s %s" % (self.scheme, remote))
-
- url = remote
- lt_exe = self.p.get_arg_or_none(wc_args.EXTRA_ARGS)
- if lt_exe == None:
- raise InvalidInput("the prop-extra-args parameter must be used and be a path to the remote execution script")
-
-
- parts = url.split('://', 1)
- fake_url = "http://" + parts[1]
- up = urlparse.urlparse(fake_url)
-
- xfer_host = up.hostname
- xfer_user = up.username
- xfer_port = int(up.port)
- if xfer_port == None:
- xfer_port = 22
- xfer_path = up.path
-
- if xfer_user:
- self.c.log.info("allowing client to specify this account: %s" % xfer_user)
- else:
- self.c.log.debug("client did not specify account")
-
- # if default is not specified, we just uses current account
- if self.sshuser:
- self.c.log.debug("using the default ssh account")
- xfer_user = self.sshuser
- else:
- self.c.log.debug("using the program runner for ssh account")
-
- if xfer_user:
- xfer_user = xfer_user + "@"
- else:
- xfer_user = ""
- try:
- import uuid
- rid = str(uuid.uuid1())
- except:
- import commands
- rid = commands.getoutput("uuidgen")
- cmd = self.ssh + " %d %s%s %s %s %s %s %s:%d" % (xfer_port, xfer_user, xfer_host, lt_exe, xfer_path, local, rid, self.ltip, self.ltport)
-
- self.c.log.debug("lantorrent command %s " % (cmd))
-
- return cmd
View
76 lantorrent/pylantorrent/client.py
@@ -5,7 +5,10 @@
import pylantorrent
from pylantorrent.server import LTServer
from pylantorrent.ltException import LTException
-import simplejson as json
+try:
+ import json
+except ImportError:
+ import simplejson as json
import traceback
import uuid
import hashlib
@@ -25,6 +28,9 @@ def __init__(self, filename, json_header):
self.header_lines.append("EOH : %s" % (auth_hash))
self.errors = []
self.complete = {}
+ self.file_data = True
+ self.pau = False
+ self.incoming_data = ""
self.dest = {}
ld = json_header['destinations']
@@ -44,34 +50,66 @@ def __init__(self, filename, json_header):
self.md5er = hashlib.md5()
+ def flush(self):
+ pass
+
def readline(self):
if len(self.header_lines) == 0:
return None
l = self.header_lines.pop(0)
return l
- def read(self, blocksize):
- d = self.data_file.read(blocksize)
- self.md5er.update(d)
+ def read(self, blocksize=1):
+ pylantorrent.log(logging.DEBUG, "begin reading.... pau is %s" % (str(self.pau)))
+
+ if self.pau:
+ pylantorrent.log(logging.DEBUG, "is pau")
+ return None
+ pylantorrent.log(logging.DEBUG, "reading.... ")
+ if self.file_data:
+ d = self.data_file.read(blocksize)
+ if not d:
+ pylantorrent.log(logging.DEBUG, "no mo file data")
+ self.file_data = False
+ else:
+ pylantorrent.log(logging.DEBUG, "### data len = %d" % (len(d)))
+ self.md5er.update(d)
+ return d
+ pylantorrent.log(logging.DEBUG, "check footer")
+ if not self.file_data:
+ pylantorrent.log(logging.DEBUG, "getting footer")
+ foot = {}
+ self.md5str = str(self.md5er.hexdigest()).strip()
+ foot['md5sum'] = self.md5str
+ d = json.dumps(foot)
+ pylantorrent.log(logging.DEBUG, "getting footer is now %s" % (d))
+ self.pau = True
+
return d
def close(self):
- self.md5str = str(md5er.hexdigest()).strip()
+ self.md5str = str(self.md5er.hexdigest()).strip()
close(self.data_file)
def write(self, data):
- try:
- json_outs = json.loads(data)
- rid = json_outs['id']
- if int(json_outs['code']) == 0:
- c = self.dest.pop(rid)
- self.complete[rid] = json_out
- self.success_count = self.success_count + 1
- else:
- d = self.dest[rid]
- d['emsg'] = json_outs
- except Exception, ex:
- pass
+ self.incoming_data = self.incoming_data + data
+
+ def process_incoming_data(self):
+ lines = self.incoming_data.split('\n')
+ for data in lines:
+ try:
+ json_outs = json.loads(data)
+ rid = json_outs['id']
+ if int(json_outs['code']) == 0:
+ c = self.dest.pop(rid)
+ self.complete[rid] = json_out
+ self.success_count = self.success_count + 1
+ else:
+ d = self.dest[rid]
+ d['emsg'] = json_outs
+ except Exception, ex:
+ pass
+ self.incoming_data = ""
def check_sum(self):
for rid in self.complete.keys():
@@ -80,9 +118,12 @@ def check_sum(self):
raise Exception("There was data corruption in the chain")
def get_incomplete(self):
+ self.process_incoming_data()
return self.dest
+
+
def main(argv=sys.argv[1:]):
dests = []
@@ -118,6 +159,7 @@ def main(argv=sys.argv[1:]):
c = LTClient(argv[0], final)
v = LTServer(c, c)
v.store_and_forward()
+ v.clean_up()
c.close()
c.check_sum()
View
5 lantorrent/pylantorrent/daemon.py
@@ -6,7 +6,10 @@
import pylantorrent
from pylantorrent.server import LTServer
from pylantorrent.client import LTClient
-import simplejson as json
+try:
+ import json
+except ImportError:
+ import simplejson as json
import traceback
import uuid
import time
View
186 lantorrent/pylantorrent/ltConnection.py
@@ -1,20 +1,29 @@
import sys
import os
-import simplejson as json
+try:
+ import json
+except ImportError:
+ import simplejson as json
import socket
import logging
import traceback
from pylantorrent.ltException import LTException
import pylantorrent
-import threading
+import select
+import zlib
+class LTDataTransformZip(object):
-class LTConnection(object):
+ def incoming_data(self, data):
+ return data
- def __init__(self, json_ent, output_printer):
+class LTDestConnection(object):
+
+ def __init__(self, json_ent, output_printer, data_transform=None):
self.ex = None
self.read_buffer_len = 1024
self.output_printer = output_printer
+ self.data_transform = data_transform
if json_ent == None:
self.valid = False
@@ -36,6 +45,7 @@ def __init__(self, json_ent, output_printer):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.port))
self.socket = s
+ self.socket.setblocking(0)
except Exception, ex:
vex = LTException(505, "%s:%d" % (self.host, self.port), self.host, self.port, reqs=self.requests)
pylantorrent.log(logging.ERROR, str(vex), traceback)
@@ -69,52 +79,154 @@ def send_header(self, destinations):
self.send(send_str)
self.send("EOH : %s\r\n" % (signature))
- def send(self, data):
+ def _poll(self, poll_period=0):
+ if not self.valid:
+ return
+ try:
+ data = self._read_from_socket(self.read_buffer_len)
+ if data:
+ self.output_printer.print_results(data)
+ except:
+ # there may jsut be no data now
+ pass
+
+ def _read_from_socket(self, size):
+ data = self.socket.recv(size)
+ return data
+
+ def _write_to_socket(self, data):
+ self.socket.sendall(data)
+
+ def read_to_eof(self):
if not self.valid:
return
+ self.socket.setblocking(1)
+ data = self._read_from_socket(self.read_buffer_len)
+ while data:
+ self.output_printer.print_results(data)
+ data = self._read_from_socket(self.read_buffer_len)
+ def send(self, data):
+ if not self.valid:
+ return
try:
- self.socket.send(data)
+ self._write_to_socket(data)
except Exception, ex:
self.valid = False
- self.ex = LTException(506, "%s:%d %s" % (self.host, self.port, str(ex)), self.host, self.port, self.requests)
+ self.ex = LTException(506, "%s:%s %s" % (self.host, str(self.port), str(ex)), self.host, self.port, self.requests)
pylantorrent.log(logging.WARNING, "send error " + str(self.ex), traceback)
j = self.ex.get_json()
s = json.dumps(j)
self.output_printer.print_results(s)
+ # see if there is anything to read
+ self._poll()
- def read_output(self):
- line = ""
- while True:
- try:
- data = self.socket.recv(self.read_buffer_len)
- except:
- data = ""
- line = line + str(data)
- la = line.split('\n')
- while len(la) > 1:
- z = la.pop(0)
- pylantorrent.log(logging.DEBUG, "got resutls %s" % (z))
- if z.strip() == "EOD":
- break
- self.output_printer.print_results(z)
-
- line = la.pop(0)
- if not data or data == "":
- break
-
- def close(self):
- try:
- self.valid = False
- #self.read_thread.join()
- self.socket.close()
- except:
- pass
- def close_read(self):
- if not self.valid:
- return
+ def close(self, force=False):
+ # reading of footer waits for eof so this is needed
self.socket.shutdown(socket.SHUT_WR)
+ self.read_to_eof()
+ self.valid = False
+ self.socket.close()
def get_exception(self):
return self.ex
+
+
+class LTSourceConnection(object):
+
+ def __init__(self, infile_obj, data_transform=None):
+ self.inf = infile_obj
+ self.footer = None
+ self.header = None
+ self.max_header_lines = 256
+ self.data_transform = data_transform
+
+ def _read(self, bs=None):
+ if bs == None:
+ d = self.inf.read()
+ else:
+ d = self.inf.read(bs)
+ return d
+
+ def _readline(self):
+ l = self.inf.readline()
+ return l
+
+ def read_footer(self, md5str):
+ if self.footer:
+ return self.footer
+
+ pylantorrent.log(logging.DEBUG, "begin reading the footer")
+ lines = ""
+ l = self._read()
+ while l:
+ lines = lines + l
+ l = self._read()
+ pylantorrent.log(logging.DEBUG, "footer is %s" % (lines))
+ foot = json.loads(lines)
+ if foot['md5sum'] != md5str:
+ raise LTException(510, "%s != %s" % (md5str, foot['md5sum']), header['host'], int(header['port']), requests_a, md5sum=md5str)
+ self.footer = foot
+ return foot
+
+
+ def read_header(self):
+ if self.header:
+ return self.header
+
+ pylantorrent.log(logging.INFO, "reading a new header")
+
+ count = 0
+ lines = ""
+ l = self._readline()
+ while l:
+ ndx = l.find("EOH : ")
+ if ndx == 0:
+ break
+ lines = lines + l
+ l = self._readline()
+ count = count + 1
+ if count == self.max_header_lines:
+ raise LTException(501, "%d lines long, only %d allowed" % (count, max_header_lines))
+ if l == None:
+ raise LTException(501, "No signature found")
+ signature = l[len("EOH : "):].strip()
+
+ auth_hash = pylantorrent.get_auth_hash(lines)
+
+ if auth_hash != signature:
+ pylantorrent.log(logging.INFO, "ACCESS DENIED |%s| != |%s| -->%s<---" % (auth_hash, signature, lines))
+ raise LTException(508, "%s is a bad signature" % (auth_hash))
+
+ self.header = json.loads(lines)
+ return self.header
+
+ # verify the header
+ try:
+ reqs = self.header['requests']
+ for r in reqs:
+ filename = r['filename']
+ rid = r['id']
+ rn = r['rename']
+
+ host = self.header['host']
+ port = int(self.header['port'])
+ urls = self.header['destinations']
+ degree = int(self.header['degree'])
+ data_length = long(self.header['length'])
+ except Exception, ex:
+ raise LTException(502, str(ex), traceback)
+
+ def read_data(self, bs):
+ return self._read(bs)
+
+class LTDestConnectionZip(LTDestConnection):
+
+ def __init__(self, json_ent, output_printer):
+ LTDestConnection.__init__(self, json_ent, output_printer)
+
+class LTSourceConnectionZip(LTSourceConnection):
+
+ def __init__(self, infile_obj):
+ LTSourceConnection.__init__(self, infile_obj)
View
5 lantorrent/pylantorrent/ltException.py
@@ -1,6 +1,9 @@
import sys
import socket
-import simplejson as json
+try:
+ import json
+except ImportError:
+ import simplejson as json
import os
class LTException(Exception):
View
5 lantorrent/pylantorrent/request.py
@@ -6,7 +6,10 @@
import pylantorrent
from pylantorrent.server import LTServer
from pylantorrent.client import LTClient
-import simplejson as json
+try:
+ import json
+except ImportError:
+ import simplejson as json
import traceback
import uuid
import time
View
225 lantorrent/pylantorrent/server.py
@@ -4,10 +4,12 @@
import logging
import pylantorrent
from pylantorrent.ltException import LTException
-from pylantorrent.ltConnection import LTConnection
-import simplejson as json
+from pylantorrent.ltConnection import *
+try:
+ import json
+except ImportError:
+ import simplejson as json
import traceback
-import threading
import hashlib
# The first thing sent is a json header terminated by a single line
@@ -30,84 +32,67 @@
class LTServer(object):
def __init__(self, inf, outf):
- self.lock = threading.Lock()
self.json_header = {}
- self.inf = inf
+ self.source_conn = LTSourceConnection(inf)
self.outf = outf
self.max_header_lines = 102400
self.block_size = 128*1024
- self.read_header()
- self.suffix = ".lattorrent"
+ self.suffix = ".lantorrent"
self.created_files = []
+ self.v_con_array = []
+ self.files_a = []
+ self.md5str = None
- def clean_up(self):
+ def _close_files(self):
+ for f in self.files_a:
+ f.close()
+ self.files_a = []
+
+ def _close_connections(self):
+ for v_con in self.v_con_array:
+ v_con.close()
+ self.v_con_array = []
+
+ def clean_up(self, force=False):
+ self._close_connections()
+ self._close_files()
+ pylantorrent.log(logging.DEBUG, "cleaning up")
for f in self.created_files:
try:
+ pylantorrent.log(logging.DEBUG, "deleting file %s" % (f))
os.remove(f)
except:
pass
+ self.created_files = []
- def read_header(self):
- max_header_lines = 256
- pylantorrent.log(logging.INFO, "reading a new header")
-
- count = 0
- lines = ""
- l = self.inf.readline()
- while l:
- ndx = l.find("EOH : ")
- if ndx == 0:
- break
- lines = lines + l
- l = self.inf.readline()
- count = count + 1
- if count == self.max_header_lines:
- raise LTException(501, "%d lines long, only %d allowed" % (count, max_header_lines))
- if l == None:
- raise LTException(501, "No signature found")
- signature = l[len("EOH : "):].strip()
-
- auth_hash = pylantorrent.get_auth_hash(lines)
-
- if auth_hash != signature:
- pylantorrent.log(logging.INFO, "ACCESS DENIED |%s| != |%s| -->%s<---" % (auth_hash, signature, lines))
- raise LTException(508, "%s is a bad signature" % (auth_hash))
-
- self.json_header = json.loads(lines)
-
- # verify the header
- try:
- reqs = self.json_header['requests']
- for r in reqs:
- filename = r['filename']
- rid = r['id']
- rn = r['rename']
-
- host = self.json_header['host']
- port = int(self.json_header['port'])
- urls = self.json_header['destinations']
- self.degree = int(self.json_header['degree'])
- self.data_length = long(self.json_header['length'])
- except Exception, ex:
- raise LTException(502, str(ex), traceback)
+ def _read_footer(self):
+ self.footer = self.source_conn.read_footer(self.md5str)
+
+ def _send_footer(self):
+ foot = {}
+ foot['md5sum'] = self.md5str
+ foot_str = json.dumps(foot)
+ pylantorrent.log(logging.DEBUG, "sending footer %s" % (foot_str))
+ for v_con in self.v_con_array:
+ v_con.send(foot_str)
+
+ def _read_header(self):
+ self.json_header = self.source_conn.read_header()
+ self.degree = int(self.json_header['degree'])
+ self.data_length = long(self.json_header['length'])
def print_results(self, s):
- pylantorrent.log(logging.DEBUG, "printing\n--------- %s\n---------------" % (s))
-# self.lock.acquire()
- try:
- self.outf.write(s)
- self.outf.write(os.linesep)
- finally:
-# self.lock.release()
- pass
+ pylantorrent.log(logging.DEBUG, "printing\n--------- \n%s\n---------------" % (s))
+ self.outf.write(s)
+ self.outf.flush()
- def get_valid_vcons(self, destinations):
+ def _get_valid_vcons(self, destinations):
v_con_array = []
while len(destinations) > 0 and len(v_con_array) < self.degree:
ep = destinations.pop(0)
try:
- v_con = LTConnection(ep, self)
+ v_con = LTDestConnection(ep, self)
v_con_array.append(v_con)
except LTException, vex:
# i think this is the only recoverable error
@@ -123,19 +108,12 @@ def get_valid_vcons(self, destinations):
mine = destinations[ndx:end]
rem = 0
v_con.send_header(mine)
+ self.v_con_array = v_con_array
- return v_con_array
-
- def store_and_forward(self):
-
- header = self.json_header
- ex_array = []
- requests_a = header['requests']
-
+ def _open_dest_files(self, requests_a):
files_a = []
for req in requests_a:
filename = req['filename']
- rid = req['id']
try:
rn = req['rename']
if rn:
@@ -146,80 +124,93 @@ def store_and_forward(self):
except Exception, ex:
pylantorrent.log(logging.ERROR, "Failed to open %s" % (filename))
raise LTException(503, str(ex), header['host'], int(header['port']), reqs=requests_a)
+ self.files_a = files_a
+
+ # perhaps this should even be an io event system or threads. For now
+ # it will throttle on the one blocking socket from the data source
+ # and push the rest to the vcon objects
+ def _process_io(self):
+ md5er = hashlib.md5()
+ read_count = 0
+ bs = self.block_size
+ while read_count < self.data_length:
+ if bs + read_count > self.data_length:
+ bs = self.data_length - read_count
+ data = self.source_conn.read_data(bs)
+ if data == None:
+ raise Exception("Data is None prior to receiving full file %d %d" % (read_count, self.data_length))
+ md5er.update(data)
+ for v_con in self.v_con_array:
+ v_con.send(data)
+ for f in self.files_a:
+ f.write(data)
+ read_count = read_count + len(data)
+ self.md5str = str(md5er.hexdigest()).strip()
+ pylantorrent.log(logging.DEBUG, "We have received sent %d bytes. The md5sum is %s" % (read_count, self.md5str))
+
+ def store_and_forward(self):
+
+ self._read_header()
+ header = self.json_header
+ requests_a = header['requests']
+
+ self._open_dest_files(requests_a)
destinations = header['destinations']
- v_con_array = self.get_valid_vcons(destinations)
-
- try:
- md5er = hashlib.md5()
- read_count = 0
- bs = self.block_size
- data = "X" # fke data value to prime the loop
- while data and read_count < self.data_length:
- if bs + read_count > self.data_length:
- bs = self.data_length - read_count
- data = self.inf.read(bs)
- if data:
- md5er.update(data)
- for v_con in v_con_array:
- v_con.send(data)
- for f in files_a:
- f.write(data)
- read_count = read_count + len(data)
- md5str = str(md5er.hexdigest()).strip()
- except Exception, ex:
- for v_con in v_con_array:
- v_con.close()
- for f in files_a:
- f.close()
- raise ex
- for f in files_a:
- f.close()
+ self._get_valid_vcons(destinations)
+ self._process_io()
+
+ # close all open files
+ self._close_files()
+ # read the footer from the sending machine
+ self._read_footer()
+ # send foot to all machines this is streaming to
+ self._send_footer()
+ # wait for eof and close
+ self._close_connections()
+ self._rename_files(requests_a)
+
+ pylantorrent.log(logging.DEBUG, "All data sent %s %d" % (self.md5str, len(requests_a)))
+ # if we got to here it was successfully written to a file
+ # and we can call it success. Print out a success message for
+ # everyfile written
+ vex = LTException(0, "Success", header['host'], int(header['port']), requests_a, md5sum=self.md5str)
+ s = vex.get_printable()
+ self.print_results(s)
+ self.clean_up()
+ def _rename_files(self, requests_a):
for req in requests_a:
realname = req['filename']
rn = req['rename']
if rn:
tmpname = realname + self.suffix
+ pylantorrent.log(logging.DEBUG, "renaming %s -> %s" % (tmpname, realname))
+
os.rename(tmpname, realname)
self.created_files.remove(tmpname)
- # close all the connections
- for v_con in v_con_array:
- v_con.read_output()
- v_con.close()
-
- pylantorrent.log(logging.DEBUG, "All data sent %s %d" % (md5str, len(requests_a)))
- # if we got to here it was successfully written to a file
- # and we can call it success. Print out a success message for
- # everyfile written
- vex = LTException(0, "Success", header['host'], int(header['port']), requests_a, md5sum=md5str)
- s = vex.get_printable()
- self.print_results(s)
-
def main(argv=sys.argv[1:]):
pylantorrent.log(logging.INFO, "server starting")
v = None
rc = 1
+ v = LTServer(sys.stdin, sys.stdout)
try:
- v = LTServer(sys.stdin, sys.stdout)
v.store_and_forward()
rc = 0
except LTException, ve:
pylantorrent.log(logging.ERROR, "error %s" % (str(ve)), traceback)
s = ve.get_printable()
- print s
+ v.print_results(s)
+ v.clean_up()
except Exception, ex:
pylantorrent.log(logging.ERROR, "error %s" % (str(ex)), traceback)
vex = LTException(500, str(ex))
s = vex.get_printable()
- print s
- finally:
- if v != None:
- v.clean_up()
- print "EOD"
+ v.print_results(s)
+ v.clean_up()
return rc
View
73 lantorrent/tests/many_files_test.py
@@ -1,73 +0,0 @@
-import string
-import random
-import os
-import sys
-import nose.tools
-import sys
-import time
-import unittest
-import tempfile
-import filecmp
-import uuid
-import traceback
-from pylantorrent.client import *
-from pylantorrent.server import *
-
-
-
-class TestManyFiels(unittest.TestCase):
-
- def setUp(self):
- self.src_file = "/etc/group"
- self.src_size = os.path.getsize(self.src_file)
- self.ports_a = os.environ['LANTORRENT_TEST_PORTS'].split(",")
- self.files = []
-
- def tearDown(self):
- while len(self.files) > 0:
- f = self.files.pop(0)
- os.remove(f)
-
- def _t_new_dest_file(self):
- (osf, fname) = tempfile.mkstemp()
- self.files.append(fname)
- os.close(osf)
- return fname
-
- def _t_file_compare(self, f):
- rc = filecmp.cmp(self.src_file, f)
- self.assertTrue(rc)
-
- def _t_build_list(self):
- new_files = []
- dests = []
- for port in self.ports_a:
- fnames_a = []
- for i in range(0, 4):
- fname = self._t_new_dest_file()
- new_files.append(fname)
- fnames_a.append(fname)
-
- ent = pylantorrent.create_endpoint_entry("localhost", fnames_a, self.src_size, port=int(port))
- dests.append(ent)
- new_files.append(fname)
- print "sending to %s" % (str(fnames_a))
-
- top = dests.pop(0)
- top['destinations'] = dests
- print top
-
- return (top, new_files)
-
-
- def send_to_all_test(self):
- (top, new_files) = self._t_build_list()
-
- c = LTClient(self.src_file, top)
- v = LTServer(c, c)
- v.store_and_forward()
-
- for f in new_files:
- self._t_file_compare(f)
- print f
-
View
3 lantorrent/tests/many_xfer_test.py
@@ -39,7 +39,8 @@ def _t_new_dest(self, host, port, sz=128*1024):
def _t_file_compare(self, f):
rc = filecmp.cmp(self.src_file, f)
- self.assertTrue(rc)
+ self.assertTrue(rc, "%s not the same as %s" % (self.src_file, f))
+
def _t_build_list(self):
new_files = []
View
1 lantorrent/tests/xfer_test.py
@@ -42,6 +42,7 @@ def _t_xfer_one_file_check(self, sz=128*1024):
v = LTServer(c, c)
v.store_and_forward()
+ print "-> %s XX %s" % (self.src_file, fname)
rc = filecmp.cmp(self.src_file, fname)
self.assertTrue(rc)
finally:

0 comments on commit ee53c59

Please sign in to comment.
Something went wrong with that request. Please try again.