Permalink
Browse files

* S3/ConnMan.py: New Connection Manager

* s3cmd: Use ConnMan instead of S3.get_connection()
  (doesn't yet support proxies!)



git-svn-id: https://s3tools.svn.sourceforge.net/svnroot/s3tools/s3cmd/branches/s3cmd-speedup@363 830e0280-6d2a-0410-9c65-932aecc39d9d
  • Loading branch information...
1 parent 7d965a5 commit 149226b4bf6c0bcd510a57f92aa1d420db82a32a @mludvig mludvig committed Jan 29, 2009
Showing with 88 additions and 20 deletions.
  1. +6 −0 ChangeLog
  2. +57 −0 S3/ConnMan.py
  3. +25 −20 S3/S3.py
View
@@ -1,3 +1,9 @@
+2009-01-30 Michal Ludvig <michal@logix.cz>
+
+ * S3/ConnMan.py: New Connection Manager
+ * s3cmd: Use ConnMan instead of S3.get_connection()
+ (doesn't yet support proxies!)
+
2009-01-28 Michal Ludvig <michal@logix.cz>
* s3cmd: Output 'delete:' in --dry-run only when
View
@@ -0,0 +1,57 @@
+import httplib
+from urlparse import urlparse
+from threading import Semaphore
+from logging import debug, info, warning, error
+
+from Config import Config
+
+__all__ = [ "ConnMan" ]
+
+class http_connection(object):
+ def __init__(self, id, hostname, ssl):
+ self.hostname = hostname
+ self.ssl = ssl
+ self.id = id
+ self.counter = 0
+ if not ssl:
+ self.c = httplib.HTTPConnection(hostname)
+ else:
+ self.c = httplib.HTTPSConnection(hostname)
+
+class ConnMan(object):
+ conn_pool_sem = Semaphore()
+ conn_pool = {}
+ conn_max_counter = 800 ## AWS closes connection after some ~90 requests
+
+ @staticmethod
+ def get(hostname, ssl = None):
+ if ssl == None:
+ ssl = Config().use_https
+ conn = None
+ conn_id = "http%s://%s" % (ssl and "s" or "", hostname)
+ ConnMan.conn_pool_sem.acquire()
+ if not ConnMan.conn_pool.has_key(conn_id):
+ ConnMan.conn_pool[conn_id] = []
+ if len(ConnMan.conn_pool[conn_id]):
+ conn = ConnMan.conn_pool[conn_id].pop()
+ debug("ConnMan.get(): re-using connection: %s#%d" % (conn.id, conn.counter))
+ ConnMan.conn_pool_sem.release()
+ if not conn:
+ debug("ConnMan.get(): creating new connection: %s" % conn_id)
+ conn = http_connection(conn_id, hostname, ssl)
+ conn.c.connect()
+ conn.counter += 1
+ return conn
+
+ @staticmethod
+ def put(conn):
+ if conn.counter >= ConnMan.conn_max_counter:
+ conn.c.close()
+ debug("ConnMan.put(): closing over-used connection")
+ return
+
+ ConnMan.conn_pool_sem.acquire()
+ ConnMan.conn_pool[conn.id].append(conn)
+ ConnMan.conn_pool_sem.release()
+ debug("ConnMan.put(): connection put back to pool (%s#%d)" % (conn.id, conn.counter))
+
View
@@ -26,6 +26,7 @@
from Config import Config
from Exceptions import *
from ACL import ACL
+from ConnMan import ConnMan
class S3(object):
http_methods = BidirMap(
@@ -357,16 +358,17 @@ def send_request(self, request, body = None, retries = _max_retries):
method_string, resource, headers = request
debug("Processing request, please wait...")
try:
- conn = self.get_connection(resource['bucket'])
- conn.request(method_string, self.format_uri(resource), body, headers)
+ #conn = self.get_connection(resource['bucket'])
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
+ conn.c.request(method_string, self.format_uri(resource), body, headers)
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
debug("Response: " + str(response))
- conn.close()
+ ConnMan.put(conn)
except Exception, e:
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
@@ -409,12 +411,13 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
info("Sending file '%s', please wait..." % file.name)
timestamp_start = time.time()
try:
- conn = self.get_connection(resource['bucket'])
- conn.connect()
- conn.putrequest(method_string, self.format_uri(resource))
+ #conn = self.get_connection(resource['bucket'])
+ #conn.connect()
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
+ conn.c.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
- conn.putheader(header, str(headers[header]))
- conn.endheaders()
+ conn.c.putheader(header, str(headers[header]))
+ conn.c.endheaders()
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
@@ -433,21 +436,21 @@ def send_file(self, request, file, labels, throttle = 0, retries = _max_retries)
#debug("SendFile: Reading up to %d bytes from '%s'" % (self.config.send_chunk, file.name))
data = file.read(self.config.send_chunk)
md5_hash.update(data)
- conn.send(data)
+ conn.c.send(data)
if self.config.progress_meter:
progress.update(delta_position = len(data))
size_left -= len(data)
if throttle:
time.sleep(throttle)
md5_computed = md5_hash.hexdigest()
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
response["size"] = size_total
- conn.close()
+ ConnMan.put(conn)
debug(u"Response: %s" % response)
except Exception, e:
if self.config.progress_meter:
@@ -522,17 +525,18 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
info("Receiving file '%s', please wait..." % stream.name)
timestamp_start = time.time()
try:
- conn = self.get_connection(resource['bucket'])
- conn.connect()
- conn.putrequest(method_string, self.format_uri(resource))
+ #conn = self.get_connection(resource['bucket'])
+ #conn.connect()
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
+ conn.c.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
- conn.putheader(header, str(headers[header]))
+ conn.c.putheader(header, str(headers[header]))
if start_position > 0:
debug("Requesting Range: %d .. end" % start_position)
- conn.putheader("Range", "bytes=%d-" % start_position)
- conn.endheaders()
+ conn.c.putheader("Range", "bytes=%d-" % start_position)
+ conn.c.endheaders()
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
@@ -585,7 +589,8 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
## Call progress meter from here...
if self.config.progress_meter:
progress.update(delta_position = len(data))
- conn.close()
+ #conn.close()
+ ConnMan.put(conn)
except Exception, e:
if self.config.progress_meter:
progress.done("failed")

0 comments on commit 149226b

Please sign in to comment.