Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Jconnor threaded downloader #8

Merged
merged 6 commits into from Sep 3, 2013
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
194 changes: 88 additions & 106 deletions nectar/downloaders/threaded.py
Expand Up @@ -22,11 +22,6 @@

import requests

try:
from requests.packages.urllib3.connectionpool import ClosedPoolError
except ImportError:
from urllib3.connectionpool import ClosedPoolError

from nectar.downloaders.base import Downloader
from nectar.report import DownloadReport, DOWNLOAD_SUCCEEDED

Expand Down Expand Up @@ -59,10 +54,21 @@ def __str__(self):

class HTTPThreadedDownloader(Downloader):
"""
Downloader class that uses the third party Eventlets with Requests to handle
Downloader class that uses native Python threads with Requests to handle
HTTP, HTTPS and proxied download requests by the server.
"""

def __init__(self, config, event_listener=None):
super(HTTPThreadedDownloader, self).__init__(config, event_listener)

# throttling support
self._bytes_lock = threading.Lock()
self._bytes_this_second = 0
self._time_bytes_this_second_was_cleared = datetime.datetime.now()

# thread-safety when firing events
self._event_lock = threading.Lock()

@property
def buffer_size(self):
return self.config.buffer_size or DEFAULT_BUFFER_SIZE
Expand All @@ -76,88 +82,42 @@ def progress_interval(self):
seconds = self.config.progress_interval or DEFAULT_PROGRESS_INTERVAL
return datetime.timedelta(seconds=seconds)

def progress_reporter(self, queue):
"""
Useful in a thread to fire reports. See below for the sentinal value that
signals the thread to end.

:param queue: queue that will contain tuples where the first member
is a DownloadReport, and the second member is a function
to call with that report. The function should handle
the firing of that report. If the first member is anything
besides a DownloadReport, this function will return.
:type queue: Queue.Queue
"""
while True:
report, report_func = queue.get()
if not isinstance(report, DownloadReport):
# done!
break
report_func(report)
queue.task_done()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have an extra blank line here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are lines that were removed, not added

def worker(self, queue, queue_ready, session, report_queue):
def worker(self, queue):
"""
:param queue: queue of DownloadRequest instances
:type queue: Queue.Queue
:param queue_ready: threading event that signals when the queue has been
sufficiently populated that it is ready for workers
to start
:type queue_ready: threading.Event
:param session: Session instance
:type session: requests.sessions.Session
:param report_queue:queue where DownloadReport instances can be dropped
for reporting. Each item added should be a tuple
with the first member a DownloadReport instance, and
the second member a function to call with that report
:type report_queue:Queue.Queue
:type queue: WorkerQueue

"""
queue_ready.wait()
while not self.is_canceled:
try:
request = queue.get_nowait()
except Queue.Empty:
break
self._fetch(request, session, report_queue)
queue.task_done()
try:
session = build_session(self.config)

def feed_queue(self, queue, queue_ready, request_list):
"""
takes DownloadRequests off of an iterator (which could be a generator),
and adds them to a queue. This is only useful if the queue has a size
limit. Sets queue_ready when the queue is full enough for workers to start.
"""
for request in request_list:
if self.is_canceled:
break
queue.put(request)
if queue.full() and not queue_ready.is_set():
queue_ready.set()
# call again in case we never filled up the queue
if not queue_ready.is_set():
queue_ready.set()
while True:
request = queue.get()

if request is None:
session.close()
break

self._fetch(request, session)

except:
_LOG.exception('Unhandled Exception in Worker Thread [%s]' % threading.currentThread().ident)

def download(self, request_list):
session = build_session(self.config)
queue = Queue.Queue(maxsize=self.max_concurrent*3)
queue_ready = threading.Event()
report_queue = Queue.Queue()
_LOG.debug('starting feed queue thread')
feeder = threading.Thread(target=self.feed_queue, args=[queue, queue_ready, request_list])
threading.Thread(target=self.progress_reporter, args=[report_queue]).start()
worker_threads = []
queue = WorkerQueue(request_list)

_LOG.debug('starting workers')
for i in range(self.max_concurrent):
threading.Thread(target=self.worker, args=[queue, queue_ready, session, report_queue]).start()

feeder.start()
feeder.join()
worker_thread = threading.Thread(target=self.worker, args=[queue])
worker_thread.setDaemon(True)
worker_thread.start()
worker_threads.append(worker_thread)

queue.join()
report_queue.join()
report_queue.put((True, None))
session.close()
for thread in worker_threads:
thread.join()

@staticmethod
def chunk_generator(raw, chunk_size):
Expand All @@ -179,7 +139,7 @@ def chunk_generator(raw, chunk_size):
break
yield chunk

def _fetch(self, request, session, report_queue):
def _fetch(self, request, session):
"""
:param request: download request object with details about what to
download and where to put it
Expand All @@ -192,12 +152,14 @@ def _fetch(self, request, session, report_queue):
# file. In that case, we must ignore the declared encoding and thus prevent
# the requests library from automatically decompressing the file.
parse_url = urlparse.urlparse(request.url)

if parse_url.path.endswith('.gz'):
ignore_encoding = True
# declare that we don't accept any encodings, so that if we do still
# get a content-encoding value in the response, we know for sure the
# other end is broken/misbehaving.
headers = {'accept-encoding': ''}

else:
ignore_encoding = False
headers = None
Expand All @@ -210,25 +172,13 @@ def _fetch(self, request, session, report_queue):

report = DownloadReport.from_download_request(request)
report.download_started()
report_queue.put((report, self.fire_download_started))

retries = DEFAULT_RETRIES
self.fire_download_started(report)

try:
if self.is_canceled:
raise DownloadCancelled(request.url)

response = None

while True:
try:
response = session.get(request.url, headers=headers)
except ClosedPoolError:
retries -= 1
if retries <= 0:
raise
else:
break
response = session.get(request.url, headers=headers)

if response.status_code != httplib.OK:
raise DownloadFailed(request.url, response.status_code, response.reason)
Expand All @@ -237,7 +187,7 @@ def _fetch(self, request, session, report_queue):
file_handle = request.initialize_file_handle()

last_update_time = datetime.datetime.now()
report_queue.put((report, self.fire_download_progress))
self.fire_download_progress(report)

if ignore_encoding:
chunks = self.chunk_generator(response.raw, self.buffer_size)
Expand All @@ -257,23 +207,23 @@ def _fetch(self, request, session, report_queue):

if now - last_update_time >= progress_interval:
last_update_time = now
report_queue.put((report, self.fire_download_progress))
self.fire_download_progress(report)

with session.nectar_bytes_lock:
if now - session.nectar_time_bytes_this_second_was_cleared >= ONE_SECOND:
session.nectar_bytes_this_second = 0
session.nectar_time_bytes_this_second_was_cleared = now
session.nectar_bytes_this_second += bytes_read
with self._bytes_lock:
if now - self._time_bytes_this_second_was_cleared >= ONE_SECOND:
self._bytes_this_second = 0
self._time_bytes_this_second_was_cleared = now
self._bytes_this_second += bytes_read

if max_speed is not None and session.nectar_bytes_this_second >= max_speed:
if max_speed is not None and self._bytes_this_second >= max_speed:
# it's not worth doing fancier mathematics than this, very
# fine-grained sleep times [1] are not honored by the system
# [1] for example, sleeping the remaining fraction of time
# before this second is up
time.sleep(0.5)

# guarantee 1 report at the end
report_queue.put((report, self.fire_download_progress))
self.fire_download_progress(report)

except DownloadCancelled, e:
_LOG.debug(str(e))
Expand All @@ -294,13 +244,20 @@ def _fetch(self, request, session, report_queue):
else:
report.download_succeeded()

finally:
request.finalize_file_handle()
request.finalize_file_handle()

if report.state is DOWNLOAD_SUCCEEDED:
report_queue.put((report, self.fire_download_succeeded))
self.fire_download_succeeded(report)
else: # DOWNLOAD_FAILED
report_queue.put((report, self.fire_download_failed))
self.fire_download_failed(report)

# used for testing purposes
return report

def _fire_event_to_listener(self, event_listener_callback, *args, **kwargs):
# thread-safe event firing
with self._event_lock:
super(HTTPThreadedDownloader, self)._fire_event_to_listener(event_listener_callback, *args, **kwargs)

# -- requests utilities --------------------------------------------------------

Expand All @@ -310,9 +267,6 @@ def build_session(config):
_add_basic_auth(session, config)
_add_ssl(session, config)
_add_proxy(session, config)
session.nectar_bytes_this_second = 0
session.nectar_time_bytes_this_second_was_cleared = datetime.datetime.now()
session.nectar_bytes_lock = threading.Lock()

return session

Expand Down Expand Up @@ -350,3 +304,31 @@ def _add_proxy(session, config):

session.proxies[protocol] = url

# -- thread-safe generator queue -----------------------------------------------

class WorkerQueue(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to have a basic doc block on the get and join methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough


def __init__(self, iterable):

self._iterable = iterable
self._generator = _generator_wrapper(self._iterable)

self._lock = threading.Lock()
self._empty_event = threading.Event()

def get(self):
with self._lock:
try:
return next(self._generator)
except StopIteration:
self._empty_event.set()
return None

def join(self):
self._empty_event.wait()


def _generator_wrapper(iterator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace this with the built-in function iter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally implemented it that way, but I wasn't sure of the consequences of casting a generator to an iterator.

# support next() for iterables, without screwing up iterators or generators
for i in iterator:
yield i
8 changes: 2 additions & 6 deletions test/unit/test_threaded_downloader.py
Expand Up @@ -204,10 +204,8 @@ def test_wrong_content_encoding(self):
response.raw = StringIO('abc')
session = threaded.build_session(self.config)
session.get = mock.MagicMock(return_value=response, spec_set=session.get)
report_queue = Queue()

self.downloader._fetch(req, session, report_queue)
report, f = report_queue.get()
report = self.downloader._fetch(req, session)

self.assertEqual(report.state, DOWNLOAD_SUCCEEDED)
self.assertEqual(report.bytes_downloaded, 3)
Expand All @@ -221,10 +219,8 @@ def test_normal_content_encoding(self):
response.iter_content = mock.MagicMock(return_value=['abc'], spec_set=response.iter_content)
session = threaded.build_session(self.config)
session.get = mock.MagicMock(return_value=response, spec_set=session.get)
report_queue = Queue()

self.downloader._fetch(req, session, report_queue)
report, f = report_queue.get()
report = self.downloader._fetch(req, session)

self.assertEqual(report.state, DOWNLOAD_SUCCEEDED)
self.assertEqual(report.bytes_downloaded, 3)
Expand Down