Skip to content

Commit

Permalink
Merge commit '0771f'
Browse files Browse the repository at this point in the history
  • Loading branch information
bdarnell committed Apr 14, 2013
2 parents 8ca13ef + 0771f8c commit 0d80ca7
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions tornado/simple_httpclient.py
Expand Up @@ -92,10 +92,12 @@ def _process_queue(self):
request, callback = self.queue.popleft() request, callback = self.queue.popleft()
key = object() key = object()
self.active[key] = (request, callback) self.active[key] = (request, callback)
_HTTPConnection(self.io_loop, self, request, release_callback = functools.partial(self._release_fetch, key)
functools.partial(self._release_fetch, key), self._handle_request(request, release_callback, callback)
callback,
self.max_buffer_size, self.resolver) def _handle_request(self, request, release_callback, final_callback):
_HTTPConnection(self.io_loop, self, request, release_callback,
final_callback, self.max_buffer_size, self.resolver)


def _release_fetch(self, key): def _release_fetch(self, key):
del self.active[key] del self.active[key]
Expand Down Expand Up @@ -153,8 +155,21 @@ def __init__(self, io_loop, client, request, release_callback,
self.resolver.resolve(host, port, af, callback=self._on_resolve) self.resolver.resolve(host, port, af, callback=self._on_resolve)


def _on_resolve(self, addrinfo): def _on_resolve(self, addrinfo):
af, sockaddr = addrinfo[0] self.stream = self._create_stream(addrinfo)
timeout = min(self.request.connect_timeout, self.request.request_timeout)
if timeout:
self._timeout = self.io_loop.add_timeout(
self.start_time + timeout,
stack_context.wrap(self._on_timeout))
self.stream.set_close_callback(self._on_close)
# ipv6 addresses are broken (in self.parsed.hostname) until
# 2.7, here is correctly parsed value calculated in __init__
sockaddr = addrinfo[0][1]
self.stream.connect(sockaddr, self._on_connect,
server_hostname=self.parsed_hostname)


def _create_stream(self, addrinfo):
af = addrinfo[0][0]
if self.parsed.scheme == "https": if self.parsed.scheme == "https":
ssl_options = {} ssl_options = {}
if self.request.validate_cert: if self.request.validate_cert:
Expand Down Expand Up @@ -187,24 +202,14 @@ def _on_resolve(self, addrinfo):
# information. # information.
ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3 ssl_options["ssl_version"] = ssl.PROTOCOL_SSLv3


self.stream = SSLIOStream(socket.socket(af), return SSLIOStream(socket.socket(af),
io_loop=self.io_loop, io_loop=self.io_loop,
ssl_options=ssl_options, ssl_options=ssl_options,
max_buffer_size=self.max_buffer_size) max_buffer_size=self.max_buffer_size)
else: else:
self.stream = IOStream(socket.socket(af), return IOStream(socket.socket(af),
io_loop=self.io_loop, io_loop=self.io_loop,
max_buffer_size=self.max_buffer_size) max_buffer_size=self.max_buffer_size)
timeout = min(self.request.connect_timeout, self.request.request_timeout)
if timeout:
self._timeout = self.io_loop.add_timeout(
self.start_time + timeout,
stack_context.wrap(self._on_timeout))
self.stream.set_close_callback(self._on_close)
# ipv6 addresses are broken (in self.parsed.hostname) until
# 2.7, here is correctly parsed value calculated in __init__
self.stream.connect(sockaddr, self._on_connect,
server_hostname=self.parsed_hostname)


def _on_timeout(self): def _on_timeout(self):
self._timeout = None self._timeout = None
Expand Down Expand Up @@ -415,7 +420,7 @@ def _on_body(self, data):
self.final_callback = None self.final_callback = None
self._release() self._release()
self.client.fetch(new_request, final_callback) self.client.fetch(new_request, final_callback)
self.stream.close() self._on_end_request()
return return
if self._decompressor: if self._decompressor:
data = (self._decompressor.decompress(data) + data = (self._decompressor.decompress(data) +
Expand All @@ -435,6 +440,9 @@ def _on_body(self, data):
buffer=buffer, buffer=buffer,
effective_url=self.request.url) effective_url=self.request.url)
self._run_callback(response) self._run_callback(response)
self._on_end_request()

def _on_end_request(self):
self.stream.close() self.stream.close()


def _on_chunk_length(self, data): def _on_chunk_length(self, data):
Expand Down

0 comments on commit 0d80ca7

Please sign in to comment.