Permalink
Browse files

initial commit

  • Loading branch information...
0 parents commit c86c4f1a0c59c58130cb1138a07a5028ca2f76c3 @rolando committed Aug 29, 2011
Showing with 264 additions and 0 deletions.
  1. +27 −0 LICENSE
  2. +52 −0 README
  3. 0 scrapy_redis/__init__.py
  4. +44 −0 scrapy_redis/dupefilter.py
  5. +32 −0 scrapy_redis/pipelines.py
  6. +40 −0 scrapy_redis/queue.py
  7. +57 −0 scrapy_redis/scheduler.py
  8. +12 −0 setup.py
27 LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) Rolando Espinoza La fuente
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification,
+are permitted provided that the following conditions are met:
+
+ 1. Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ 2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+
+ 3. Neither the name of scrapy-redis nor the names of its contributors may be used
+ to endorse or promote products derived from this software without
+ specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
52 README
@@ -0,0 +1,52 @@
+Redis-based components for Scrapy
+=================================
+
+This is a initial work on Scrapy-Redis integration, not production-tested.
+Use it at your own risk!
+
+Features:
+* Distributed crawling/scraping
+* Distributed post-processing
+
+Requirements:
+* Scrapy >= 0.13 (development version)
+* redis-py (tested on 2.4.9)
+* redis server (tested on 2.2-2.4)
+
+Available Scrapy components:
+* Scheduler
+* Duplication Filter
+* Item Pipeline
+
+
+Running the example project
+---------------------------
+
+You can test the funcionality following the next steps:
+
+1. Setup scrapy_redis package in your PYTHONPATH
+
+2. Run the crawler for first time then stop it
+
+ $ cd examples
+ $ scrapy crawl dmoz
+ ^C
+
+3. Run the crawler again to resume stopped crawling
+
+ $ scrapy crawl dmoz
+ ... [dmoz] DEBUG: Resuming crawl (9019 requests scheduled)
+
+4. Start one or more additional scrapy crawlers
+
+ $ scrapy crawl dmoz
+ ... [dmoz] DEBUG: Resuming crawl (8712 requests scheduled)
+
+5. Start one or more post-processing workers
+
+ $ python process_items.py
+ Processing: Kilani Giftware (http://www.dmoz.org/Computers/Shopping/Gifts/)
+ Processing: NinjaGizmos.com (http://www.dmoz.org/Computers/Shopping/Gifts/)
+ ...
+
+That's it.
No changes.
@@ -0,0 +1,44 @@
+import redis
+import time
+from scrapy.dupefilter import BaseDupeFilter
+from scrapy.utils.request import request_fingerprint
+
+
+class RFPDupeFilter(BaseDupeFilter):
+ """Redis-based request duplication filter"""
+
+ def __init__(self, server, key):
+ """Initialize duplication filter
+
+ Parameters:
+ server -- Redis connection
+ key -- redis key to store fingerprints
+
+ """
+ self.server = server
+ self.key = key
+
+ @classmethod
+ def from_settings(cls, settings):
+ host = settings.get('REDIS_HOST', 'localhost')
+ port = settings.get('REDIS_PORT', 6379)
+ server = redis.Redis(host, port)
+ # create one-time key. needed to support to use this
+ # class as standalone dupefilter with scrapy's default scheduler
+ # if scrapy passes spider on open() method this wouldn't be needed
+ key = "dupefilter:%s" % int(time.time())
+ return cls(server, key)
+
+ def request_seen(self, request):
+ fp = request_fingerprint(request)
+ added = self.server.sadd(self.key, fp)
+ return not added
+
+ def close(self, reason):
+ """Delete data on close. Called by scrapy's scheduler"""
+ self.clear()
+
+ def clear(self):
+ """Clears fingerprints data"""
+ self.server.delete(self.key)
+
@@ -0,0 +1,32 @@
+import redis
+
+from twisted.internet.threads import deferToThread
+from scrapy.utils.serialize import ScrapyJSONEncoder
+
+
+class RedisPipeline(object):
+ """Pushes serialized item into a redis list/queue"""
+
+ def __init__(self, host, port):
+ self.server = redis.Redis(host, port)
+ self.encoder = ScrapyJSONEncoder()
+
+ @classmethod
+ def from_settings(cls, settings):
+ host = settings.get('REDIS_HOST', 'localhost')
+ port = settings.get('REDIS_PORT', 6379)
+ return cls(host, port)
+
+ def process_item(self, item, spider):
+ return deferToThread(self._process_item, item, spider)
+
+ def _process_item(self, item, spider):
+ key = self.item_key(item, spider)
+ data = self.encoder.encode(dict(item))
+ self.server.rpush(key, data)
+ return item
+
+ def item_key(self, item, spider):
+ """Returns redis key based on given spider"""
+ return "%s:items" % spider.name
+
@@ -0,0 +1,40 @@
+import marshal
+from scrapy.utils.reqser import request_to_dict, request_from_dict
+
+
+class SpiderQueue(object):
+ """Per-spider queue abstraction on top of redis using sorted set"""
+
+ def __init__(self, server, spider, key):
+ """Initialize per-spider redis queue
+
+ Parameters:
+ server -- redis connection
+ spider -- spider instance
+ key -- key for this queue (e.g. "%(spider)s:queue")
+
+ """
+ self.server = server
+ self.spider = spider
+ self.key = key % {'spider': spider.name}
+
+ def __len__(self):
+ return self.server.zcard(self.key)
+
+ def push(self, request):
+ data = marshal.dumps(request_to_dict(request, self.spider))
+ pairs = {data: -request.priority}
+ self.server.zadd(self.key, **pairs)
+
+ def pop(self):
+ # use atomic range/remove using multi/exec
+ pipe = self.server.pipeline()
+ pipe.multi()
+ pipe.zrange(self.key, 0, 0).zremrangebyrank(self.key, 0, 0)
+ results, count = pipe.execute()
+ if results:
+ return request_from_dict(marshal.loads(results[0]), self.spider)
+
+ def clear(self):
+ self.server.delete(self.key)
+
@@ -0,0 +1,57 @@
+import redis
+from scrapy_redis.queue import SpiderQueue
+from scrapy_redis.dupefilter import RFPDupeFilter
+
+
+# default values
+REDIS_HOST = 'localhost'
+REDIS_PORT = 6379
+SCHEDULER_PERSIST = False
+QUEUE_KEY = '%(spider)s:requests'
+DUPEFILTER_KEY = '%(spider)s:dupefilter'
+
+
+class Scheduler(object):
+ """Redis-based scheduler"""
+
+ def __init__(self, server, persist, queue_key):
+ self.server = server
+ self.persist = persist
+ self.queue_key = queue_key
+
+ def __len__(self):
+ return len(self.queue)
+
+ @classmethod
+ def from_settings(cls, settings):
+ host = settings.get('REDIS_HOST', REDIS_HOST)
+ port = settings.get('REDIS_PORT', REDIS_PORT)
+ persist = settings.get('SCHEDULER_PERSIST', SCHEDULER_PERSIST)
+ queue_key = settings.get('SCHEDULER_QUEUE_KEY', QUEUE_KEY)
+ server = redis.Redis(host, port)
+ return cls(server, persist, queue_key)
+
+ def open(self, spider):
+ self.spider = spider
+ self.queue = SpiderQueue(self.server, spider, self.queue_key)
+ self.df = RFPDupeFilter(self.server, DUPEFILTER_KEY % {'spider': spider.name})
+ # notice if there are requests already in the queue
+ if len(self.queue):
+ spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
+
+ def close(self, reason):
+ if not self.persist:
+ self.df.clear()
+ self.queue.clear()
+
+ def enqueue_request(self, request):
+ if not request.dont_filter and self.df.request_seen(request):
+ return
+ self.queue.push(request)
+
+ def next_request(self):
+ return self.queue.pop()
+
+ def has_pending_requests(self):
+ return len(self) > 0
+
@@ -0,0 +1,12 @@
+from distutils.core import setup
+
+setup(name='scrapy-redis',
+ version='0.1',
+ description='Redis-based components for Scrapy',
+ author='Rolando Espinoza La fuente',
+ author_email='darkrho@gmail.com',
+ url='http://github.com/darkrho/scrapy-redis',
+ packages=['scrapy_redis'],
+ license='BSD',
+ #install_requires=['Scrapy>=0.13'],
+ )

4 comments on commit c86c4f1

hello

hello
I get a error
''''
File "/usr/local/lib/python2.6/dist-packages/scrapy_redis/queue.py", line 34
pipe.exec()
^
SyntaxError: invalid syntax
''''
this is what causes?

Oh, no, this is wrong
‘’‘’
pipe.multi()
exceptions.AttributeError: 'Pipeline' object has no attribute 'multi'
‘’‘’

Owner

rolando replied Feb 17, 2013

which version of readis are you using?

Please sign in to comment.