-
Notifications
You must be signed in to change notification settings - Fork 14
Appendix D: Crawl Manager Classes
Previous Chapter: Appendix C: Workflow Manager
- Introduction
- Class hierarchy
- CrawlManager
- PeriodicCrawlManager
- GeneratorCrawlManager
- AsyncSchedulerCrawlManagerMixin
- Parameter types
- Writing and running a crawl manager
A crawl manager schedules spider jobs on Scrapy Cloud and reacts to their outcomes. The classes
live in
shub_workflow/crawl.py
and build on WorkFlowManager — documented in
Appendix C: Workflow Manager
— which provides the name / flow-id / owned-jobs / resume machinery they all rely on.
This appendix is the complete reference for the crawl manager classes. For a gentler, example-led introduction read the Crawl Managers tutorial chapter first.
Pick the class by how jobs are produced:
| Class | Schedules |
|---|---|
CrawlManager |
one spider job; if looping, exits when it finishes (inheriting its close reason). |
PeriodicCrawlManager |
the same spider job again and again — reschedules after each finish. Runs forever. |
GeneratorCrawlManager |
a stream of jobs whose arguments come from a generator you write; stops when the generator is exhausted and all jobs finished. The one you'll use most. |
AsyncSchedulerCrawlManagerMixin |
mixed into a GeneratorCrawlManager to schedule each cycle's jobs concurrently (asyncio). |
WorkFlowManager (Appendix C — name + flow id, owned-jobs tracking, resume)
▲
CrawlManager schedule one spider, check running jobs, outcome hooks
▲
├── PeriodicCrawlManager reschedule the same job forever
└── GeneratorCrawlManager schedule from set_parameters_gen(), with dedup + retries
▲ (mix in for concurrent scheduling)
AsyncSchedulerCrawlManagerMixin
All of them are also SpiderStatsAggregatorMixin (via CrawlManager), so finished spider stats are
aggregated into the manager's own stats automatically.
Schedules a single spider job. With loop_mode set it stays alive checking that job each cycle
and shuts down when it finishes, adopting the job's close reason. Source:
crawl.py.
| Attribute | Default | Meaning |
|---|---|---|
spider |
None |
target spider. If unset, a positional spider CLI arg is required; if set, that arg disappears. |
running_jobs_reverse_check |
False |
check running jobs newest-first when True (matters for throttling logic that wants the most recent job). |
(Plus all the WorkFlowManager
attributes — name, default_max_jobs, flow_id_required, …)
Positional spider (only when the spider attribute is unset), --spider-args (JSON dict),
--job-settings (JSON dict), --units (int).
| Method | Purpose |
|---|---|
schedule_spider_with_jobargs(job_args_override=None, spider=None) |
schedule one spider job, merging --spider-args, the override, and job settings; records it as running. |
get_job_settings(override=None) |
the --job-settings dict merged with an optional override. |
check_running_jobs() |
poll tracked jobs; for each finished one call finished_metadata_hook then bad_outcome_hook or finished_ok_hook, and aggregate its stats. |
workflow_loop() |
one cycle: check running jobs; if none running, schedule the spider. |
running_job_hook(jobkey) |
override — react to a still-running job (only the key is passed, to stay cheap). |
finished_metadata_hook(jobkey, metadata) |
override — react to any finished job from its metadata. |
bad_outcome_hook(spider, outcome, job_args_override, jobkey) |
override — react to a failed outcome. Base sets the manager's close reason to the outcome. |
finished_ok_hook(spider, outcome, job_args_override, jobkey) |
override — react to a successful finish. Exactly one of bad/ok hook fires per job. |
Set loop_mode (class attr or --loop-mode) to keep it alive; with loop_mode = 0 it schedules
once and exits.
A CrawlManager that reschedules the same job with the same parameters after each one finishes,
forever. Requires loop_mode. It neutralizes the base bad_outcome_hook (a failed job doesn't stop
it) and doesn't finish itself on a child's completion. Override the hooks for custom reactions.
The workhorse. Schedules a stream of jobs whose arguments come from the generator you implement,
respecting max_running_jobs, until the generator is exhausted and all jobs have finished. Source:
crawl.py.
You must override set_parameters_gen() — a generator yielding one dict per job to schedule.
Each dict is spider arguments ({arg: value}); recognized Scrapy Cloud keys are pulled out and
handled specially: spider (override the target per job), units, tags, job_settings,
project_id (cross-project scheduling). Everything else becomes the job's spider args.
class CrawlManager(GeneratorCrawlManager):
name = "crawl"
loop_mode = 120
default_max_jobs = 4
spider = "example_spider" # default; a yielded "spider" key overrides per job
def set_parameters_gen(self):
for input_file in list_folder("s3://bucket/inputs"):
yield {"input_file": input_file} # → spider arg input_file=<...>If spider is unset (None) every yielded dict must include a spider key; set it to a default to
make that optional, or to "" to always require it explicitly.
| Concern | Detail |
|---|---|
| Per-cycle scheduling |
workflow_loop() checks running jobs, then schedules up to max_running_jobs − running new jobs from the generator. |
| De-duplication | each job's (spider, spider_args) hashes to a unique id kept in a bloom filter (create_dupe_filter(), override to change capacity/error rate); a dict producing an already-seen id is skipped. So re-yielding identical params won't reschedule. |
| Out-of-band jobs |
add_job(spider, job_args_override) queues an extra job ahead of the generator (used by the retry logic). |
| Delayed jobs | override can_schedule_job_with_params(params) -> bool; returning False defers a job to a delayed queue, retried in later cycles (e.g. per-domain concurrency limits). |
| Sequencing | each scheduled job gets a JOBSEQ= tag; retries get .rN suffixes. |
Built into bad_outcome_hook. Set MAX_RETRIES (default 0 = no retries). On a failed outcome
other than cancelled, while under MAX_RETRIES, the job is re-added (via add_job) with a
retry_num spider arg and a RETRIED_FROM= tag. Customize per-retry changes by overriding
get_retry_override(spider, outcome, job_args_override, jobkey) -> JobParams (return extra
spider_args/job_settings/tags/etc.); raise StopRetry from it to abort retrying a given job.
Or override bad_outcome_hook entirely for bespoke logic (call add_job(...) to requeue).
Mix it in front of a GeneratorCrawlManager to schedule each cycle's jobs concurrently
(asyncio.gather) instead of one-by-one — valuable when a cycle schedules many jobs and scheduling
latency dominates.
class CrawlManager(AsyncSchedulerCrawlManagerMixin, GeneratorCrawlManager):
name = "crawl"
loop_mode = 30
spider = "example_spider"
def set_parameters_gen(self):
...It overrides workflow_loop and schedule_spider_with_jobargs to be coroutines. Because run() is
now async, the entry point must use asyncio.run(script.run()) (see below). set_parameters_gen()
itself stays an ordinary (synchronous) generator.
From crawl.py:
-
JobParams(TypedDict):units,tags,job_settings,project_id,spider_args— the per-job overrides passed around the hooks. -
FullJobParams=JobParams+spider— whatadd_job()and the internal scheduler use. -
ScheduleArgs= the raw dict yourset_parameters_gen()yields (flat spider args plus any recognized SC keys).
A generator crawl manager and its entry point:
import logging
from shub_workflow.crawl import GeneratorCrawlManager
class CrawlManager(GeneratorCrawlManager):
name = "crawl"
loop_mode = 60 # seconds between cycles; required for periodic/generator managers
default_max_jobs = 4 # at most 4 spider jobs running at once
spider = "example_spider"
@property
def description(self):
return "Schedule example_spider for each input."
def set_parameters_gen(self):
for n in range(10):
yield {"page": n} # → spider arg page=<n>
if __name__ == "__main__":
from shub_workflow.utils import get_kumo_loglevel
logging.basicConfig(format="%(asctime)s %(name)s [%(levelname)s]: %(message)s", level=get_kumo_loglevel())
CrawlManager().run()For an AsyncSchedulerCrawlManagerMixin manager the last line is asyncio.run(CrawlManager().run())
instead. As with any shub-workflow script, register the file in your project's setup.py so it can
be deployed and invoked on Scrapy Cloud as py:crawlmanager.py; deployment itself is out of scope
here.