Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Some spider refactoring. Remove NullTask object.
  • Loading branch information
lorien committed May 23, 2015
1 parent 63a9a73 commit fff8cc6
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 134 deletions.
2 changes: 1 addition & 1 deletion grab/spider/__init__.py
@@ -1,4 +1,4 @@
from grab.spider.base import Spider # noqa
from grab.spider.data import Data # noqa
from grab.spider.task import Task, inline_task, NullTask # noqa
from grab.spider.task import Task, inline_task # noqa
from grab.spider.error import * # noqa
139 changes: 51 additions & 88 deletions grab/spider/base.py
Expand Up @@ -21,7 +21,7 @@
from grab.error import GrabInvalidUrl
from grab.spider.error import (SpiderError, SpiderMisuseError, FatalError,
NoTaskHandler, NoDataHandler)
from grab.spider.task import Task, NullTask
from grab.spider.task import Task
from grab.spider.data import Data
from grab.spider.transport.multicurl import MulticurlTransport
from grab.proxylist import ProxyList, BaseProxySource
Expand Down Expand Up @@ -282,60 +282,31 @@ def add_task(self, task, raise_error=False):
else:
task.priority_is_custom = True

if not isinstance(task, NullTask):
try:
if not task.url.startswith(('http://', 'https://', 'ftp://',
'file://', 'feed://')):
if self.base_url is None:
msg = 'Could not resolve relative URL because base_url ' \
'is not specified. Task: %s, URL: %s'\
% (task.name, task.url)
raise SpiderError(msg)
else:
task.url = urljoin(self.base_url, task.url)
# If task has grab_config object then update it too
if task.grab_config:
task.grab_config['url'] = task.url
except Exception as ex:
self.stat.append('task-with-invalid-url', task.url)
if raise_error:
raise
try:
if not task.url.startswith(('http://', 'https://', 'ftp://',
'file://', 'feed://')):
if self.base_url is None:
msg = 'Could not resolve relative URL because base_url ' \
'is not specified. Task: %s, URL: %s'\
% (task.name, task.url)
raise SpiderError(msg)
else:
logger.error('', exc_info=ex)
return False
task.url = urljoin(self.base_url, task.url)
# If task has grab_config object then update it too
if task.grab_config:
task.grab_config['url'] = task.url
except Exception as ex:
self.stat.append('task-with-invalid-url', task.url)
if raise_error:
raise
else:
logger.error('', exc_info=ex)
return False

# TODO: keep original task priority if it was set explicitly
self.taskq.put(task, task.priority, schedule_time=task.schedule_time)
return True

def load_initial_urls(self):
"""
Create initial tasks from `self.initial_urls`.
Tasks are created with name "initial".
"""

if self.initial_urls:
for url in self.initial_urls:
self.add_task(Task('initial', url=url))

def setup_default_queue(self):
"""
If task queue is not configured explicitly
then create task queue with default parameters
This method is not the same as `self.setup_queue` because
`self.setup_queue` works by default with in-memory queue.
You can override `setup_default_queue` in your custom
Spider and use other storage engines for you
default task queue.
"""

# If queue is still not configured
# then configure it with default backend
if self.taskq is None:
self.setup_queue()

def process_task_generator(self):
"""
Load new tasks from `self.task_generator_object`
Expand Down Expand Up @@ -374,22 +345,22 @@ def process_task_generator(self):
'Disabling it')
self.task_generator_enabled = False

def init_task_generator(self):
def start_task_generator(self):
"""
Process `initial_urls` and `task_generator`.
Generate first portion of tasks.
Process `self.initial_urls` list and `self.task_generator`
method. Generate first portion of tasks.
TODO: task generator should work in separate OS process
"""

self.task_generator_object = self.task_generator()
self.task_generator_enabled = True

logger_verbose.debug('Processing initial urls')
self.load_initial_urls()
if self.initial_urls:
for url in self.initial_urls:
self.add_task(Task('initial', url=url))

# Initial call to task generator
# before main cycle
self.task_generator_object = self.task_generator()
self.task_generator_enabled = True
# Initial call to task generator before spider has started working
self.process_task_generator()

def load_new_task(self):
Expand All @@ -411,10 +382,10 @@ def load_new_task(self):
else:
# Temporarily hack which force slave crawler
# to wait 5 seconds for new tasks, this solves
# the problem that sometimes slave crawler stop
# its work because it could not receive new
# the problem: sometimes slave crawler stops
# working because it could not receive new
# tasks immediately
if not self.transport.active_task_number():
if not self.transport.get_active_threads_number():
if time.time() - start < 5:
time.sleep(0.1)
logger.debug('Slave sleeping')
Expand All @@ -428,8 +399,6 @@ def load_new_task(self):

def process_task_counters(self, task):
task.network_try_count += 1
if task.task_try_count == 0:
task.task_try_count = 1

def create_grab_instance(self, **kwargs):
# Back-ward compatibility for deprecated `grab_config` attribute
Expand Down Expand Up @@ -775,19 +744,23 @@ def run(self):
self.timer.start('total')
self.transport = MulticurlTransport(self.thread_number)
try:
self.setup_default_queue()
# Run custom things defined by this specific spider
# By defaut it does nothing
self.prepare()

self.timer.start('task_generator')
if not self.slave:
self.init_task_generator()
self.timer.stop('task_generator')
# Setup task queue if it has not been configured yet
if self.taskq is None:
self.setup_queue()

# Initiate task generate. Only in main process!
with self.timer.log_time('task_generator'):
if not self.slave:
self.start_task_generator()

while self.work_allowed:
self.timer.start('task_generator')
if self.task_generator_enabled:
self.process_task_generator()
self.timer.stop('task_generator')
with self.timer.log_time('task_generator'):
if self.task_generator_enabled:
self.process_task_generator()

free_threads = self.transport.get_free_threads_number()
if free_threads:
Expand All @@ -801,7 +774,7 @@ def run(self):
for x in six.moves.range(5):
task = self.load_new_task()
if task is None:
if not self.transport.active_task_number():
if not self.transport.get_active_threads_number():
self.process_task_generator()
elif task is True:
# If only delayed tasks in queue
Expand All @@ -810,27 +783,19 @@ def run(self):
# If got some task
break

if not task:
if not self.transport.active_task_number():
if task is None:
if not self.transport.get_active_threads_number():
logger_verbose.debug('Network transport has no '
'active tasks')
if not self.task_generator_enabled:
self.stop()
else:
logger_verbose.debug(
'Transport active tasks: %d' %
self.transport.active_task_number())
elif isinstance(task, NullTask):
logger_verbose.debug('Got NullTask')
if not self.transport.active_task_number():
if task.sleep:
logger.debug('Got NullTask with sleep '
'instruction. Sleeping for'
' %.2f seconds' % task.sleep)
time.sleep(task.sleep)
self.transport.get_active_threads_number())
elif isinstance(task, bool) and (task is True):
# Take some sleep to not load CPU
if not self.transport.active_task_number():
if not self.transport.get_active_threads_number():
time.sleep(0.1)
else:
logger_verbose.debug('Got new task from task queue: %s'
Expand Down Expand Up @@ -947,8 +912,6 @@ def process_handler_result(self, result, task=None):
task)
elif result is None:
pass
elif isinstance(result, NullTask):
pass
else:
raise SpiderError('Unknown result type: %s' % result)

Expand Down Expand Up @@ -983,7 +946,7 @@ def process_next_page(self, grab, task, xpath,
page = task.get('page', 1) + 1
grab2 = grab.clone()
grab2.setup(url=url)
task2 = task.clone(task_try_count=0, grab=grab2,
task2 = task.clone(task_try_count=1, grab=grab2,
page=page, **kwargs)
self.add_task(task2)
return True
Expand Down
17 changes: 1 addition & 16 deletions grab/spider/task.py
Expand Up @@ -16,7 +16,7 @@ class Task(BaseTask):

def __init__(self, name=None, url=None, grab=None, grab_config=None,
priority=None, priority_is_custom=True,
network_try_count=0, task_try_count=0,
network_try_count=0, task_try_count=1,
disable_cache=False, refresh_cache=False,
valid_status=[], use_proxylist=True,
cache_timeout=None, delay=0,
Expand Down Expand Up @@ -246,21 +246,6 @@ def get_fallback_handler(self, spider):
return None


class NullTask(BaseTask):
def __init__(self, name='initial', sleep=0, priority=None,
priority_is_custom=True, network_try_count=0,
task_try_count=0):
self.name = name
self.sleep = sleep
self.priority = None
self.priority_is_custom = False
self.network_try_count = network_try_count
self.task_try_count = task_try_count

self.schedule_time = None
self.original_delay = None


def inline_task(f):
def wrap(self, grab, task):
origin_task_generator = f(self, grab, task)
Expand Down
20 changes: 11 additions & 9 deletions grab/spider/transport/multicurl.py
Expand Up @@ -6,16 +6,16 @@


class MulticurlTransport(object):
def __init__(self, thread_number):
self.thread_number = thread_number
def __init__(self, socket_number):
self.socket_number = socket_number
self.multi = pycurl.CurlMulti()
self.multi.handles = []
self.freelist = []
self.registry = {}
self.connection_count = {}

# Create curl instances
for x in six.moves.range(self.thread_number):
for x in six.moves.range(self.socket_number):
curl = pycurl.Curl()
self.connection_count[id(curl)] = 0
self.freelist.append(curl)
Expand All @@ -27,8 +27,8 @@ def ready_for_task(self):
def get_free_threads_number(self):
return len(self.freelist)

def active_task_number(self):
return self.thread_number - len(self.freelist)
def get_active_threads_number(self):
return self.socket_number - len(self.freelist)

def process_connection_count(self, curl):
curl_id = id(curl)
Expand Down Expand Up @@ -67,8 +67,7 @@ def process_task(self, task, grab, grab_config_backup):
def process_handlers(self):
# Ok, frankly I have real bad understanding of
# how to deal with multicurl sockets ;-)
# It is a sort of miracle that Grab is used by some people
# and they managed to get job done
# It is a sort of miracle that Grab actually works
rlist, wlist, xlist = self.multi.fdset()
if rlist or wlist or xlist:
timeout = self.multi.timeout()
Expand Down Expand Up @@ -133,8 +132,11 @@ def iterate_results(self):
del self.registry[curl_id]
grab.transport.curl = None

yield {'ok': ok, 'emsg': emsg, 'grab': grab,
'grab_config_backup': grab_config_backup, 'task': task}
yield {'ok': ok,
'emsg': emsg,
'grab': grab,
'grab_config_backup': grab_config_backup,
'task': task}

self.multi.remove_handle(curl)
curl.reset()
Expand Down
21 changes: 1 addition & 20 deletions test/spider.py
@@ -1,5 +1,5 @@
import six
from grab.spider import Spider, Task, NullTask
from grab.spider import Spider, Task
from grab.spider.error import SpiderError, FatalError
import os
import signal
Expand Down Expand Up @@ -203,25 +203,6 @@ def check_task_limits(self, task):
fallback_name='fallback_zz'))
self.assertRaises(SpiderError, bot.run)

def test_null_task(self):
class TestSpider(Spider):
pass


bot = TestSpider()
bot.setup_queue()
bot.add_task(NullTask(sleep=0.3))

points = []
def sleep(arg):
points.append(arg)


with mock.patch('time.sleep', sleep):
bot.run()

self.assertEqual(points, [0.3])

def test_fatal_error(self):
class TestSpider(Spider):
def task_page(self, grab, task):
Expand Down

0 comments on commit fff8cc6

Please sign in to comment.