Permalink
Browse files

Merge branch 'http-speedup'

  • Loading branch information...
2 parents 12acd8a + 8a3e46c commit aec743c3b0caeb1257491492e6d5cbdac77d9b16 @mludvig mludvig committed Mar 10, 2013
Showing with 103 additions and 44 deletions.
  1. +71 −0 S3/ConnMan.py
  2. +32 −44 S3/S3.py
View
@@ -0,0 +1,71 @@
+import httplib
+from urlparse import urlparse
+from threading import Semaphore
+from logging import debug, info, warning, error
+
+from Config import Config
+from Exceptions import ParameterError
+
+__all__ = [ "ConnMan" ]
+
+class http_connection(object):
+ def __init__(self, id, hostname, ssl, cfg):
+ self.hostname = hostname
+ self.ssl = ssl
+ self.id = id
+ self.counter = 0
+ if cfg.proxy_host != "":
+ self.c = httplib.HTTPConnection(cfg.proxy_host, cfg.proxy_port)
+ elif not ssl:
+ self.c = httplib.HTTPConnection(hostname)
+ else:
+ self.c = httplib.HTTPSConnection(hostname)
+
+class ConnMan(object):
+ conn_pool_sem = Semaphore()
+ conn_pool = {}
+ conn_max_counter = 800 ## AWS closes connection after some ~90 requests
+
+ @staticmethod
+ def get(hostname, ssl = None):
+ cfg = Config()
+ if ssl == None:
+ ssl = cfg.use_https
+ conn = None
+ if cfg.proxy_host != "":
+ if ssl:
+ raise ParameterError("use_ssl=True can't be used with proxy")
+ conn_id = "proxy://%s:%s" % (cfg.proxy_host, cfg.proxy_port)
+ else:
+ conn_id = "http%s://%s" % (ssl and "s" or "", hostname)
+ ConnMan.conn_pool_sem.acquire()
+ if not ConnMan.conn_pool.has_key(conn_id):
+ ConnMan.conn_pool[conn_id] = []
+ if len(ConnMan.conn_pool[conn_id]):
+ conn = ConnMan.conn_pool[conn_id].pop()
+ debug("ConnMan.get(): re-using connection: %s#%d" % (conn.id, conn.counter))
+ ConnMan.conn_pool_sem.release()
+ if not conn:
+ debug("ConnMan.get(): creating new connection: %s" % conn_id)
+ conn = http_connection(conn_id, hostname, ssl, cfg)
+ conn.c.connect()
+ conn.counter += 1
+ return conn
+
+ @staticmethod
+ def put(conn):
+ if conn.id.startswith("proxy://"):
+ conn.c.close()
+ debug("ConnMan.put(): closing proxy connection (keep-alive not yet supported)")
+ return
+
+ if conn.counter >= ConnMan.conn_max_counter:
+ conn.c.close()
+ debug("ConnMan.put(): closing over-used connection")
+ return
+
+ ConnMan.conn_pool_sem.acquire()
+ ConnMan.conn_pool[conn.id].append(conn)
+ ConnMan.conn_pool_sem.release()
+ debug("ConnMan.put(): connection put back to pool (%s#%d)" % (conn.id, conn.counter))
+
View
@@ -27,6 +27,7 @@
from Exceptions import *
from MultiPart import MultiPartUpload
from S3Uri import S3Uri
+from ConnMan import ConnMan
try:
import magic, gzip
@@ -190,15 +191,6 @@ class S3(object):
def __init__(self, config):
self.config = config
- def get_connection(self, bucket):
- if self.config.proxy_host != "":
- return httplib.HTTPConnection(self.config.proxy_host, self.config.proxy_port)
- else:
- if self.config.use_https:
- return httplib.HTTPSConnection(self.get_hostname(bucket))
- else:
- return httplib.HTTPConnection(self.get_hostname(bucket))
-
def get_hostname(self, bucket):
if bucket and check_bucket_name_dns_conformity(bucket):
if self.redir_map.has_key(bucket):
@@ -246,10 +238,9 @@ def _get_common_prefixes(data):
truncated = True
list = []
prefixes = []
- conn = self.get_connection(bucket)
while truncated:
- response = self.bucket_list_noparse(conn, bucket, prefix, recursive, uri_params)
+ response = self.bucket_list_noparse(bucket, prefix, recursive, uri_params)
current_list = _get_contents(response["data"])
current_prefixes = _get_common_prefixes(response["data"])
truncated = _list_truncated(response["data"])
@@ -263,19 +254,17 @@ def _get_common_prefixes(data):
list += current_list
prefixes += current_prefixes
- conn.close()
-
response['list'] = list
response['common_prefixes'] = prefixes
return response
- def bucket_list_noparse(self, connection, bucket, prefix = None, recursive = None, uri_params = {}):
+ def bucket_list_noparse(self, bucket, prefix = None, recursive = None, uri_params = {}):
if prefix:
uri_params['prefix'] = self.urlencode_string(prefix)
if not self.config.recursive and not recursive:
uri_params['delimiter'] = "/"
request = self.create_request("BUCKET_LIST", bucket = bucket, **uri_params)
- response = self.send_request(request, conn = connection)
+ response = self.send_request(request)
#debug(response)
return response
@@ -662,7 +651,7 @@ def _fail_wait(self, retries):
# Wait a few seconds. The more it fails the more we wait.
return (self._max_retries - retries + 1) * 3
- def send_request(self, request, body = None, retries = _max_retries, conn = None):
+ def send_request(self, request, body = None, retries = _max_retries):
method_string, resource, headers = request.get_triplet()
debug("Processing request, please wait...")
if not headers.has_key('content-length'):
@@ -671,25 +660,20 @@ def send_request(self, request, body = None, retries = _max_retries, conn = None
# "Stringify" all headers
for header in headers.keys():
headers[header] = str(headers[header])
- if conn is None:
- debug("Establishing connection")
- conn = self.get_connection(resource['bucket'])
- close_conn = True
- else:
- debug("Using existing connection")
- close_conn = False
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
uri = self.format_uri(resource)
debug("Sending request method_string=%r, uri=%r, headers=%r, body=(%i bytes)" % (method_string, uri, headers, len(body or "")))
- conn.request(method_string, uri, body, headers)
+ conn.c.request(method_string, uri, body, headers)
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
debug("Response: " + str(response))
- if close_conn is True:
- conn.close()
+ ConnMan.put(conn)
+ except ParameterError, e:
+ raise
except Exception, e:
if retries:
warning("Retrying failed request: %s (%s)" % (resource['uri'], e))
@@ -732,12 +716,13 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
info("Sending file '%s', please wait..." % file.name)
timestamp_start = time.time()
try:
- conn = self.get_connection(resource['bucket'])
- conn.connect()
- conn.putrequest(method_string, self.format_uri(resource))
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
+ conn.c.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
- conn.putheader(header, str(headers[header]))
- conn.endheaders()
+ conn.c.putheader(header, str(headers[header]))
+ conn.c.endheaders()
+ except ParameterError, e:
+ raise
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
@@ -760,22 +745,24 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
else:
data = buffer
md5_hash.update(data)
- conn.send(data)
+ conn.c.send(data)
if self.config.progress_meter:
progress.update(delta_position = len(data))
size_left -= len(data)
if throttle:
time.sleep(throttle)
md5_computed = md5_hash.hexdigest()
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
response["data"] = http_response.read()
response["size"] = size_total
- conn.close()
+ ConnMan.put(conn)
debug(u"Response: %s" % response)
+ except ParameterError, e:
+ raise
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
@@ -797,7 +784,7 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
response["speed"] = response["elapsed"] and float(response["size"]) / response["elapsed"] or float(-1)
if self.config.progress_meter:
- ## The above conn.close() takes some time -> update() progress meter
+ ## Finalising the upload takes some time -> update() progress meter
## to correct the average speed. Otherwise people will complain that
## 'progress' and response["speed"] are inconsistent ;-)
progress.update()
@@ -872,21 +859,22 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
info("Receiving file '%s', please wait..." % stream.name)
timestamp_start = time.time()
try:
- conn = self.get_connection(resource['bucket'])
- conn.connect()
- conn.putrequest(method_string, self.format_uri(resource))
+ conn = ConnMan.get(self.get_hostname(resource['bucket']))
+ conn.c.putrequest(method_string, self.format_uri(resource))
for header in headers.keys():
- conn.putheader(header, str(headers[header]))
+ conn.c.putheader(header, str(headers[header]))
if start_position > 0:
debug("Requesting Range: %d .. end" % start_position)
- conn.putheader("Range", "bytes=%d-" % start_position)
- conn.endheaders()
+ conn.c.putheader("Range", "bytes=%d-" % start_position)
+ conn.c.endheaders()
response = {}
- http_response = conn.getresponse()
+ http_response = conn.c.getresponse()
response["status"] = http_response.status
response["reason"] = http_response.reason
response["headers"] = convertTupleListToDict(http_response.getheaders())
debug("Response: %s" % response)
+ except ParameterError, e:
+ raise
except Exception, e:
if self.config.progress_meter:
progress.done("failed")
@@ -938,7 +926,7 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
## Call progress meter from here...
if self.config.progress_meter:
progress.update(delta_position = len(data))
- conn.close()
+ ConnMan.put(conn)
except Exception, e:
if self.config.progress_meter:
progress.done("failed")

0 comments on commit aec743c

Please sign in to comment.