-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1421 from ajdavis/spider-example
Add a web spider example to demonstrate Queue.
- Loading branch information
Showing
3 changed files
with
125 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import HTMLParser | ||
import time | ||
import urlparse | ||
from datetime import timedelta | ||
|
||
from tornado import httpclient, gen, ioloop, queues | ||
|
||
base_url = 'http://www.tornadoweb.org/en/stable/' | ||
concurrency = 10 | ||
|
||
|
||
@gen.coroutine | ||
def get_links_from_url(url): | ||
"""Download the page at `url` and parse it for links. | ||
Returned links have had the fragment after `#` removed, and have been made | ||
absolute so, e.g. the URL 'gen.html#tornado.gen.coroutine' becomes | ||
'http://www.tornadoweb.org/en/stable/gen.html'. | ||
""" | ||
try: | ||
response = yield httpclient.AsyncHTTPClient().fetch(url) | ||
print('fetched %s' % url) | ||
urls = [urlparse.urljoin(url, remove_fragment(new_url)) | ||
for new_url in get_links(response.body)] | ||
except Exception as e: | ||
print('Exception: %s %s' % (e, url)) | ||
raise gen.Return([]) | ||
|
||
raise gen.Return(urls) | ||
|
||
|
||
def remove_fragment(url): | ||
scheme, netloc, url, params, query, fragment = urlparse.urlparse(url) | ||
return urlparse.urlunparse((scheme, netloc, url, params, query, '')) | ||
|
||
|
||
def get_links(html): | ||
class URLSeeker(HTMLParser.HTMLParser): | ||
def __init__(self): | ||
HTMLParser.HTMLParser.__init__(self) | ||
self.urls = [] | ||
|
||
def handle_starttag(self, tag, attrs): | ||
href = dict(attrs).get('href') | ||
if href and tag == 'a': | ||
self.urls.append(href) | ||
|
||
url_seeker = URLSeeker() | ||
url_seeker.feed(html) | ||
return url_seeker.urls | ||
|
||
|
||
@gen.coroutine | ||
def main(): | ||
q = queues.Queue() | ||
start = time.time() | ||
fetching, fetched = set(), set() | ||
|
||
@gen.coroutine | ||
def fetch_url(): | ||
current_url = yield q.get() | ||
try: | ||
if current_url in fetching: | ||
return | ||
|
||
print('fetching %s' % current_url) | ||
fetching.add(current_url) | ||
urls = yield get_links_from_url(current_url) | ||
fetched.add(current_url) | ||
|
||
for new_url in urls: | ||
# Only follow links beneath the base URL | ||
if new_url.startswith(base_url): | ||
yield q.put(new_url) | ||
|
||
finally: | ||
q.task_done() | ||
|
||
@gen.coroutine | ||
def worker(): | ||
while True: | ||
yield fetch_url() | ||
|
||
q.put(base_url) | ||
|
||
# Start workers, then wait for the work queue to be empty. | ||
for _ in range(concurrency): | ||
worker() | ||
yield q.join(timeout=timedelta(seconds=300)) | ||
assert fetching == fetched | ||
print('Done in %d seconds, fetched %s URLs.' % ( | ||
time.time() - start, len(fetched))) | ||
|
||
|
||
if __name__ == '__main__': | ||
import logging | ||
logging.basicConfig() | ||
io_loop = ioloop.IOLoop.current() | ||
io_loop.run_sync(main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
:class:`~tornado.queues.Queue` example - a concurrent web spider | ||
================================================================ | ||
|
||
.. currentmodule:: tornado.queues | ||
|
||
Tornado's `tornado.queues` module implements an asynchronous producer / | ||
consumer pattern for coroutines, analogous to the pattern implemented for | ||
threads by the Python standard library's `queue` module. | ||
|
||
A coroutine that yields `Queue.get` pauses until there is an item in the queue. | ||
If the queue has a maximum size set, a coroutine that yields `Queue.put` pauses | ||
until there is room for another item. | ||
|
||
A `~Queue` maintains a count of unfinished tasks, which begins at zero. | ||
`~Queue.put` increments the count; `~Queue.task_done` decrements it. | ||
|
||
In the web-spider example here, the queue begins containing only base_url. When | ||
a worker fetches a page it parses the links and puts new ones in the queue, | ||
then calls `~Queue.task_done` to decrement the counter once. Eventually, a | ||
worker fetches a page whose URLs have all been seen before, and there is also | ||
no work left in the queue. Thus that worker's call to `~Queue.task_done` | ||
decrements the counter to zero. The main coroutine, which is waiting for | ||
`~Queue.join`, is unpaused and finishes. | ||
|
||
.. literalinclude:: ../../demos/webspider/webspider.py |