Parallel workers to cp and mv command. Backward python 2.5.2 compatibility. #2

Open
wants to merge 6 commits into
from
View
@@ -77,6 +77,9 @@ class Config(object):
parallel = False
workers = 10
follow_symlinks=False
+ select_dir = False
+ max_retries = 5
+ retry_delay = 3
## Creating a singleton
def __new__(self, configfile = None):
View
@@ -47,7 +47,7 @@ def update_timestamp(self):
def format_param_str(self):
"""
Format URL parameters from self.params and returns
- ?parm1=val1&parm2=val2 or an empty string if there
+ ?parm1=val1&parm2=val2 or an empty string if there
are no parameters. Output of this function should
be appended directly to self.resource['uri']
"""
@@ -119,9 +119,6 @@ class S3(object):
## S3 sometimes sends HTTP-307 response
redir_map = {}
- ## Maximum attempts of re-issuing failed requests
- _max_retries = 5
-
def __init__(self, config):
self.config = config
@@ -458,9 +455,11 @@ def create_request(self, operation, uri = None, bucket = None, object = None, he
def _fail_wait(self, retries):
# Wait a few seconds. The more it fails the more we wait.
- return (self._max_retries - retries + 1) * 3
+ return (self.config.max_retries - retries + 1) * self.config.retry_delay
- def send_request(self, request, body = None, retries = _max_retries):
+ def send_request(self, request, body = None, retries = -1):
+ if retries == -1:
+ retries = self.config.max_retries
method_string, resource, headers = request.get_triplet()
debug("Processing request, please wait...")
if not headers.has_key('content-length'):
@@ -479,8 +478,11 @@ def send_request(self, request, body = None, retries = _max_retries):
except Exception, e:
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
return self.send_request(request, body, retries - 1)
else:
raise S3RequestError("Request failed for: %s" % resource['uri'])
@@ -498,8 +500,11 @@ def send_request(self, request, body = None, retries = _max_retries):
if retries:
warning(u"Retrying failed request: %s" % resource['uri'])
warning(unicode(e))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
return self.send_request(request, body, retries - 1)
else:
raise e
@@ -509,7 +514,9 @@ def send_request(self, request, body = None, retries = _max_retries):
return response
- def send_file(self, request, file, labels, throttle = 0, retries = _max_retries):
+ def send_file(self, request, file, labels, throttle = 0, retries = -1):
+ if retries == -1:
+ retries = self.config.max_retries
method_string, resource, headers = request.get_triplet()
size_left = size_total = headers.get("content-length")
if self.config.progress_meter:
@@ -529,8 +536,11 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
# Connection error -> same throttle value
return self.send_file(request, file, labels, throttle, retries - 1)
else:
@@ -562,12 +572,15 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
if self.config.progress_meter:
progress.done("failed")
if retries:
- if retries < self._max_retries:
+ if retries < self.config.max_retries:
throttle = throttle and throttle * 5 or 0.01
warning("Upload failed: %s (%s)" % (resource['uri'], e))
warning("Retrying on lower speed (throttle=%0.2f)" % throttle)
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
# Connection error -> same throttle value
return self.send_file(request, file, labels, throttle, retries - 1)
else:
@@ -612,8 +625,11 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
if try_retry:
if retries:
warning("Upload failed: %s (%s)" % (resource['uri'], S3Error(response)))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
return self.send_file(request, file, labels, throttle, retries - 1)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
@@ -634,7 +650,9 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
return response
- def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
+ def recv_file(self, request, stream, labels, start_position = 0, retries = -1):
+ if retries == -1:
+ retries = self.config.max_retries
method_string, resource, headers = request.get_triplet()
if self.config.progress_meter:
progress = self.config.progress_class(labels, 0)
@@ -662,8 +680,11 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
# Connection error -> same throttle value
return self.recv_file(request, stream, labels, start_position, retries - 1)
else:
@@ -711,8 +732,11 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
progress.done("failed")
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
- warning("Waiting %d sec..." % self._fail_wait(retries))
- time.sleep(self._fail_wait(retries))
+
+ if self._fail_wait(retries) > 0:
+ warning("Waiting %d sec..." % self._fail_wait(retries))
+ time.sleep(self._fail_wait(retries))
+
# Connection error -> same throttle value
return self.recv_file(request, stream, labels, current_position, retries - 1)
else:
Oops, something went wrong.