Permalink
Browse files

Add parallel workers to cp and mv command

  • Loading branch information...
1 parent 9a65f78 commit 1b22cd7c4c0d6342b48888e2311260ecb59aa2bb @cyno cyno committed Aug 2, 2011
Showing with 60 additions and 16 deletions.
  1. +60 −16 s3cmd
View
76 s3cmd
@@ -381,7 +381,7 @@ def cmd_object_put(args):
#Necessary to ensure KeyboardInterrupt can actually kill
#Otherwise Queue.join() blocks until all queue elements have completed
- while threading.active_count() > 1:
+ while threading.activeCount() > 1:
time.sleep(.1)
q.join()
@@ -532,7 +532,7 @@ def cmd_object_get(args):
#Necessary to ensure KeyboardInterrupt can actually kill
#Otherwise Queue.join() blocks until all queue elements have completed
- while threading.active_count() > 1:
+ while threading.activeCount() > 1:
time.sleep(.1)
q.join()
@@ -689,20 +689,64 @@ def subcmd_cp_mv(args, process_fce, action_str, message):
warning(u"Exitting now because of --dry-run")
return
- seq = 0
- for key in remote_list:
- seq += 1
- seq_label = "[%d of %d]" % (seq, remote_count)
+ if cfg.parallel and len(remote_list) > 1:
+ #Disabling progress metter for parallel downloads.
+ cfg.progress_meter = False
+ #Initialize Queue
+ global q
+ q = Queue.Queue()
+
+ seq = 0
+ for key in remote_list:
+ seq += 1
+ item = remote_list[key]
+ src_uri = S3Uri(item['object_uri_str'])
+ dst_uri = S3Uri(item['dest_name'])
+ extra_headers = copy(cfg.extra_headers)
+ q.put([src_uri,dst_uri,extra_headers,process_fce,message])
- item = remote_list[key]
- src_uri = S3Uri(item['object_uri_str'])
- dst_uri = S3Uri(item['dest_name'])
+ for i in range(cfg.workers):
+ t = threading.Thread(target=cp_mv_worker)
+ t.daemon = True
+ t.start()
- extra_headers = copy(cfg.extra_headers)
- response = process_fce(src_uri, dst_uri, extra_headers)
- output(message % { "src" : src_uri, "dst" : dst_uri })
- if Config().acl_public:
- info(u"Public URL is: %s" % dst_uri.public_url())
+ #Necessary to ensure KeyboardInterrupt can actually kill
+ #Otherwise Queue.join() blocks until all queue elements have completed
+ while threading.activeCount() > 1:
+ time.sleep(.1)
+
+ q.join()
+ else:
+ seq = 0
+ for key in remote_list:
+ seq += 1
+ seq_label = "[%d of %d]" % (seq, remote_count)
+
+ item = remote_list[key]
+ src_uri = S3Uri(item['object_uri_str'])
+ dst_uri = S3Uri(item['dest_name'])
+
+ extra_headers = copy(cfg.extra_headers)
+ response = process_fce(src_uri, dst_uri, extra_headers)
+ output(message % { "src" : src_uri, "dst" : dst_uri })
+ if Config().acl_public:
+ info(u"Public URL is: %s" % dst_uri.public_url())
+
+def cp_mv_worker():
+ while True:
+ try:
+ (src_uri,dst_uri,extra_headers,process_fce,message) = q.get_nowait()
+ except Queue.Empty:
+ return
+ try:
+ response = process_fce(src_uri, dst_uri, extra_headers)
+ output(message % { "src" : src_uri, "dst" : dst_uri })
+ if Config().acl_public:
+ info(u"Public URL is: %s" % dst_uri.public_url())
+ except Exception, e:
+ report_exception(e)
+ exit
+ q.task_done()
def cmd_cp(args):
s3 = S3(Config())
@@ -997,7 +1041,7 @@ def cmd_sync_remote2local(args):
#Necessary to ensure KeyboardInterrupt can actually kill
#Otherwise Queue.join() blocks until all queue elements have completed
- while threading.active_count() > 1:
+ while threading.activeCount() > 1:
time.sleep(.1)
q.join()
@@ -1191,7 +1235,7 @@ def cmd_sync_local2remote(args):
#Necessary to ensure KeyboardInterrupt can actually kill
#Otherwise Queue.join() blocks until all queue elements have completed
- while threading.active_count() > 1:
+ while threading.activeCount() > 1:
time.sleep(.1)
q.join()

0 comments on commit 1b22cd7

Please sign in to comment.