Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make requests via message queues #3477

Open
octohedron opened this issue Oct 29, 2018 · 8 comments
Open

Make requests via message queues #3477

octohedron opened this issue Oct 29, 2018 · 8 comments

Comments

@octohedron
Copy link

octohedron commented Oct 29, 2018

I'm trying to pass requests to the spider externally, via message queues, and keep it running forever.

I found some projects made by others but none of them work for the current version of scrapy, so I'm trying to fix the issues or just find a way to do it myself.

So far, I found that others got a reference to the scheduler from the spider in the middleware, like

class MyMiddleware(object):
    # [...]
    def ensure_init(self, spider):
        self.spider = spider
        self.scheduler = spider.crawler.engine.slot.scheduler
    # [...]
    def process_response(self, request, response, spider):
        self.ensure_init(spider)
        return response

Then, in another, custom "Scheduler" class

class MyScheduler(object):
    # [...]
    def open(self, spider):
        self.spider = spider
    # [...]
    def next_request(self):
        return self.spider.make_request(page)

And then, in the spider

class TestSpider(scrapy.Spider):
    # [...]
    def make_request(self, page):
        # This works, i.e. prints "Making request"
        logging.info("Making request")
        yield scrapy.Request(url=page, callback=self.parse)
    # [...]
    def parse(self, response):
        # This never gets printed
        logging.info("Got response")

This is the simplest example I could put together, but you get the idea, the code in reality is very messy, which makes it harder to fix.

The issue is, that in theory it should work, although I don't know when is that next_request method called, but it does because it calls the make_request method in the spider, the only problem is that it never gets to the parse or callback method in the spider, I don't know why.

I also tried connecting the spider to the message queue directly in the spider, which should work, but it doesn't, for example

import pika

class TestSpider(scrapy.Spider):
        rbmqrk = 'test'
        rmq = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        # Init channel
        rmqc = rmq.channel()

    def __init__(self, *args, **kwargs):
        super(TestSpider, self).__init__(*args, **kwargs)
        self.rmqc.queue_declare(queue=self.rbmqrk)

    def start_requests(self):
        self.rmqc.basic_consume(self.callback, self.rbmqrk)

    def callback(channel, method_frame, header_frame, body):
        # This gets printed! It works up to here.
        logging.info(body)

Up to there, everything is working fine, the body of the message received from the queue gets printed.

But, if we try to make or yield a request from the callback method in the spider, it won't work, for example

class TestSpider(scrapy.Spider):
       # [...] same as above...

    def start_requests(self):
        # Same as above
        self.rmqc.basic_consume(self.callback, self.rbmqrk)

    def callback(channel, method_frame, header_frame, body):
        # This DOESN'T get printed
        logging.info(body)
        yield scrapy.Request(url=body.decode(), callback=self.parse)
        # This wouldn't work either (already tried as well)
        # return scrapy.Request(url=body.decode(), callback=self.parse)

    def prase(self, response):
        # We never get here, this doesn't get printer either
        logging.info("Got response")

Unfortunately, yielding a generator or a request from the callback is not making a spider request.

As you can see, I've tried several things without luck, but all I need to do is to be able to make requests with the messages from the message queue, I'm not sure if there's a bug in scrapy or there's something I can fix in the code but I would love to have some input on this before I start digging deeper into the scrapy code myself.

@nicksherron
Copy link

Did you figure this out? I’m curious about something similar. I have 20M seed urls that I want to que and haven’t found an example of someone implementing something similar.

@octohedron
Copy link
Author

octohedron commented Nov 5, 2018

@nsherron90 Yeah, I've been digging deeper through this and finally found out that you can't combine a twisted app, i.e. scrapy with pika, the RabbitMQ client because it also uses Twisted, asyncio doesn't work either, I haven't tried all RabbitMQ clients though.

So, still waiting for a fix and working on it.

I tried several other things as well, like adding a class field to the spider to store the messages as they come and then fetch from it in start_requests, etc, no luck.

There's one possible solution using pika that would involve disconnecting and re-connecting every few messages, but it isn't optimal.

@nicksherron
Copy link

nicksherron commented Nov 6, 2018

@octohedron After staying up 24hrs straight trying to work this out I decided to go with a redis que. See scrapy-redis. It was fairly easy to setup and uses redis as dedupe and scheduler(que) with persistence.

I set it up on a gcloud compute engine instance and setup redis-server as a dameon

$ redis-server --daemonize yes

Then started the spider with nohup (prevents logging out off ssh session from killing the spider)

$ nohup scrapy crawl myspider -o out.jl

Once spider is running just add seed urls to redis and it should start sending to your crawl instance that you started with nohup. Here's how I did it.

$ cat urlseed.txt | awk '{print "LPUSH myspider:start_urls", NR, "$0}' | redis-cli --pipe

Its been running for over 24hrs now and I have tested the peristence bit and it all seems to be working just as expected. I still need to find out what happens when the que is finished and you try adding more or how to setup parallel runs but will be digging more into that next. Hope this helps.

@octohedron
Copy link
Author

@nsherron90 Glad you made it work for your use case, although that's exactly the issue we're trying to solve, i.e. not using redis for all the URLs because we have billions and it would require too much RAM, additionally we want the system to be fully automated, with that approach you need to be re-running the spiders "manually".

We've been digging deeper into it and it seems that this is not the scrapy way of doing things as it's not documented and there's no simple way of keeping the spider running while waiting for the messages because of the way scrapy uses twisted, although it should be supported because it could be possible to yield a deferred with a timeout to the reactor, or something like that, not a twisted expert.

So, what I'm going to do is set up a redis buffer of say 1M max URLs, and feed it from another system that will additionally launch more spiders if necessary, keeping track of them with scrapyd

This will of course introduce other challenges like feeding the buffer, slow disk I/O, etc.

Another project to look into doing similar things is scrapy-cluster

I'm leaving the issue open, because maybe someone with a deeper understanding of scrapy and twisted can yield some tips to make this work, although it's not the way it's designed to be used, otherwise it would be documented somewhere.

@OlgaCh
Copy link

OlgaCh commented Nov 16, 2018

Hi @octohedron wondering if the Scrapy RT may be useful for your task.
Sadly it's not being actively maintained recently.

@elacuesta
Copy link
Member

Hello @octohedron. One of the problems I see in the above snippets is that the start_requests method needs to return an iterable of scrapy.Request objects.

You could use the signals feature to enqueue more requests when the spider is idle.
The following is a proof of concept that reads from a local file, it could be modified to a scenario in which URLs are read from a different source which is being updated while the crawl is running.

Consider you have a list of URLs (continuous.txt):

http://httpbin.org/get?index=1
http://httpbin.org/get?index=2
http://httpbin.org/get?index=3
http://httpbin.org/get?index=4
http://httpbin.org/get?index=5

And a spider:

from scrapy import Spider, Request, signals


class ContinuousSpider(Spider):
    name = 'continuous'

    @classmethod
    def from_crawler(cls, crawler):
        spider = super(ContinuousSpider, cls).from_crawler(crawler)
        spider.fp = open('continuous.txt', 'r')
        crawler.signals.connect(spider.spider_idle, signals.spider_idle)
        return spider

    def start_requests(self):
        yield Request(self._next_url())
    
    def _next_url(self):
        return self.fp.readline().strip()

    def spider_idle(self, spider):
        url = self._next_url()
        if url:
            self.crawler.engine.crawl(Request(url), self)

    def closed(self, reason):
        self.fp.close()

    def parse(self, response):
        return dict(url=response.url)

More information about scheduling requests directly from the engine in #542

@octohedron
Copy link
Author

Thanks for your reply @elacuesta

One of the problems I see in the above snippets is that the start_requests method needs to return an iterable of scrapy.Request objects.

That's the first thing we tried, if you could provide a fully working example, which should only take a few minutes if you know what you are doing, with pika or any other RabbitMQ/AMQP client, it would be great 👍

Feel free to use/modify this project https://github.com/octohedron/pikatest which is the minimum scrapy and pika/RabbitMQ integration setup.

@true1337
Copy link

Have anyone tried to integrate pika consumer implementation on Twisted?

https://github.com/pika/pika/blob/master/examples/twisted_service.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants