Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Seems to working now

  • Loading branch information...
commit 58bb4b6e05400491528d51d4d6df13a03d9d7649 1 parent b9604d8
@jcgregorio jcgregorio authored
Showing with 28 additions and 20 deletions.
  1. +4 −1 planet.py
  2. +24 −19 planet/spider.py
View
5 planet.py
@@ -54,7 +54,10 @@
if not offline:
from planet import spider
- spider.spiderPlanet(only_if_new=only_if_new)
+ try:
+ spider.spiderPlanet(only_if_new=only_if_new)
+ except Exception, e:
+ print e
from planet import splice
doc = splice.splice()
View
43 planet/spider.py
@@ -330,40 +330,45 @@ def spiderPlanet(only_if_new = False):
global index
index = True
- if config.spider_threads():
- import Queue
+ if int(config.spider_threads()):
+ from Queue import Queue, Empty
from threading import Thread
import httplib2
work_queue = Queue()
awaiting_parsing = Queue()
- def _spider_proc():
+ def _spider_proc(thread_index):
h = httplib2.Http(config.http_cache_directory())
- while True:
- # The non-blocking get will throw an exception when the queue
- # is empty which will terminate the thread.
- uri = work_queue.get(block=False):
- log.info("Fetching %s", uri)
- (resp, content) = h.request(uri)
- awaiting_parsing.put(block=True, (resp, content, uri))
+ try:
+ while True:
+ # The non-blocking get will throw an exception when the queue
+ # is empty which will terminate the thread.
+ uri = work_queue.get(block=False)
+ log.info("Fetching %s via %d", uri, thread_index)
+ (resp, content) = h.request(uri)
+ awaiting_parsing.put(block=True, item=(resp, content, uri))
+ except Empty, e:
+ log.info("Thread %d finished", thread_index)
+ pass
# Load the work_queue with all the HTTP(S) uris.
- map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)])
+ map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)])
# Start all the worker threads
- threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())])
+ threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))])
for t in threads.itervalues():
t.start()
# Process the results as they arrive
- while work_queue.qsize() and awaiting_parsing.qsize() and threads:
- item = awaiting_parsing.get(False)
- if not item and threads:
+ while work_queue.qsize() or awaiting_parsing.qsize() or threads:
+ if awaiting_parsing.qsize() == 0 and threads:
time.sleep(1)
- while item:
+ while awaiting_parsing.qsize():
+ item = awaiting_parsing.get(False)
try:
(resp_headers, content, uri) = item
+ log.info("Parsing pre-fetched %s", uri)
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
except Exception, e:
import sys, traceback
@@ -372,15 +377,15 @@ def _spider_proc():
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
- item = awaiting_parsing.get(False)
- for index in threads:
+ for index in threads.keys():
if not threads[index].isAlive():
del threads[index]
+ log.info("Finished threaded part of processing.")
planet.setTimeout(config.feed_timeout())
# Process non-HTTP uris if we are threading, otherwise process *all* uris here.
- unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)]
+ unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)]
for feed in unthreaded_work_queue:
try:
spiderFeed(feed, only_if_new=only_if_new)
Please sign in to comment.
Something went wrong with that request. Please try again.