Skip to content

Appendix B: Script Classes

Martin Olveyra edited this page Jun 18, 2026 · 3 revisions

Previous Chapter: Appendix A: Classes Diagram


Introduction

A shub-workflow script is a plain Python program, subclassing one of the base classes in shub_workflow/script.py, that either runs on Scrapy Cloud (as a periodic job, a long-running manager, etc.) or operates on Scrapy Cloud from anywhere (scheduling spiders/scripts, scanning and querying jobs, aggregating stats). Crawl managers, monitors, schedulers, consumers, deliverers and ad-hoc CLIs (e.g. the scanjobs utility) are all built on these classes.

They give you, for free: argument parsing (with reusable command-line "programs"), project-id resolution, the ScrapinghubClient, job scheduling with flow/name tagging, job querying with pagination and retries, a Scrapy stats collector, and (for the loop variants) a managed run loop.

This appendix documents the four classes you subclass directly:

Class Use it when
ArgumentParserScript you only need argparse + the PROGRAMS shortcut mechanism, with no Scrapy Cloud access (rare; it's the base the others build on).
BaseScript a one-shot script: parse args, do the work in run(), exit. The default choice.
BaseLoopScript a script that must repeat work on an interval / run continuously until told to stop.
BaseLoopScriptAsyncMixin a loop script whose cycle is asyncio-based (e.g. schedules many jobs concurrently).

Class hierarchy

ArgumentParserScript              # argparse + PROGRAMS (-g/-v); no SC access
        ▲
        │   (+ SCProjectClass: ScrapinghubClient, get_project)
        │
   BaseScript                     # the workhorse: args, project id, scheduling, job queries, stats
        ▲
        │
   BaseLoopScript                 # adds the managed run loop (loop_mode, workflow_loop, hooks)
        ▲
        │   (mix in for an async cycle)
   BaseLoopScriptAsyncMixin       # overrides run()/scheduling to be asyncio-based

Each class has a matching *Protocol (ArgumentParserScriptProtocol, BaseScriptProtocol, BaseLoopScriptProtocol) — typing-only Protocol classes that declare the public interface. You use them when writing a mixin that needs to call base methods but should not itself inherit the implementation (see BaseScript).

ArgumentParserScript

The foundation. It owns self.args and the argument parser, and adds the program shortcut mechanism. On construction it calls parse_args(), which:

  1. builds an ArgumentParser titled with the script's description,
  2. calls add_argparser_options() (override this to add your arguments — always call super()),
  3. registers self.PROGRAMS,
  4. parses sys.argv, expanding -g <alias> / -v key:val,... into the program's stored command line (see the ScanJobs chapter for the PROGRAMS format and the {var} / {{ }} rules).

Set the script's purpose via the description property and your arguments via add_argparser_options():

class MyScript(ArgumentParserScript):
    @property
    def description(self):
        return "What this script does."

    def add_argparser_options(self):
        super().add_argparser_options()                 # keep -g/-v
        self.argparser.add_argument("target")           # positional
        self.argparser.add_argument("--limit", type=int, default=100)

    def run(self):                                       # you MUST implement run()
        print(self.args.target, self.args.limit)

run() is abstract across all the script classes — it is your entry point, invoked by you from the __main__ block.

BaseScript

The class you'll subclass most. It is ArgumentParserScript + SCProjectClass (which provides self.client, a ScrapinghubClient, and get_project()), wired together with Scrapy project settings, a spider loader, a stats collector and an FSHelper.

Lifecycle

Constructing the script runs, in order: load Scrapy project_settings; build the spider loader; parse_args() (which resolves the target project id and the own project id); optionally load live Scrapy Cloud settings (--load-sc-settings); set the flow id / name and tag the job; build the stats collector and FSHelper. Then you call run().

Class attributes

Attribute Default Meaning
name "" logical name; when set, added as a NAME= tag and propagated to children as PARENT_NAME=.
project_required True if False, the script won't error when no project id is available (for scripts that don't touch SC).
default_project_id None default for --project-id; if None, auto-detected (resolve_project_id).
flow_id_required False if True, the script must run within a flow id (auto-generated via generate_flow_id() if absent).
children_tags None extra tags added to every job this script schedules.
PROGRAMS {} command-line shortcut definitions (inherited).

Arguments it adds

--project-id (numeric or a scrapinghub.yml keyword; the target project where child jobs are scheduled), --flow-id, --children-tag/-t (repeatable), --load-sc-settings, plus the inherited -g/-v.

Two project ids. self.project_id is the target project (where you schedule/query jobs, from --project-id). self._own_project_id is where the script itself is running. They differ when a script running in project A schedules/scans jobs in project B.

What it gives you

  • Schedulingschedule_spider(spider, tags=, units=, project_id=, **kwargs) and schedule_script(cmd, tags=, project_id=, units=, meta=) (a script name is normalized to the py:<name>.py form). Both return a JobKey or None. Duplicate/again errors route through the overridable handle_schedule_duplicate_error() / handle_schedule_error() hooks.
  • Job queryingget_jobs(project_id=None, **kwargs) is a paginated, de-duplicated generator over JobDicts (handles SC's 1000-per-page limit and count); get_jobs_with_tags(spider, tags, ...) filters by a set of tags; is_running(jobkey), is_finished(jobkey) (returns close reason), finish(jobkey=None, close_reason=) (finishes own job if jobkey is None).
  • Tags & flowappend_flow_tag() (a tag transmitted to children), add_job_tags() / remove_job_tags(), get_job_tags(), get_keyvalue_job_tag(). Flow id and name are stored as FLOW_ID= / NAME= tags and propagated to scheduled children, so a whole workflow shares a flow id.
  • Statsself.stats (a Scrapy StatsCollector), upload_stats(), print_stats().
  • SC project settingsget_sc_project_settings() reads the live project settings from the dashboard API (used for runtime config).
  • Spidersget_canonical_spidername() (maps spiders sharing a site to a common canonical_name), get_project_running_spiders(...).

All Scrapy-Cloud-touching calls are wrapped with a retry decorator (dash_retry_decorator).

The project-base-mixin pattern

Projects usually centralize shared options/helpers in a mixin and have each concrete script inherit from MixinFirst, BaseScript. The mixin inherits BaseScriptProtocol (the typing-only interface) so it can call base methods and type-check, without re-inheriting the implementation:

from shub_workflow.script import BaseScript, BaseScriptProtocol

class MyProjectScriptMixin(BaseScriptProtocol):
    def add_argparser_options(self):
        super().add_argparser_options()
        self.argparser.add_argument("--env", default="prod")

    def shared_helper(self): ...

class MyScript(MyProjectScriptMixin, BaseScript):   # mixin first, BaseScript last
    def run(self): ...

BaseLoopScript

BaseScript plus a managed run loop. Instead of doing everything in run(), you implement workflow_loop(), which is called repeatedly; run() itself is provided.

How the loop works

run() calls _on_start() (→ your on_start() hook, then enables the loop), then repeatedly calls workflow_loop(). After each cycle: if workflow_loop() returned truthy and --loop-mode is set to N seconds, it sleeps N seconds and loops again; otherwise it stops. On exit it calls _close() (→ your on_close() hook, sets a close reason, uploads & prints stats). Stats are also uploaded periodically during the loop (stats_interval).

class MyLoop(BaseLoopScript):
    loop_mode = 60          # default seconds between cycles (0 = run once); --loop-mode overrides
    max_running_time = 0    # if >0, auto-stop after this many seconds

    def on_start(self):
        ...                 # one-time setup

    def workflow_loop(self) -> bool:
        ...                 # one cycle of work
        return True         # True = keep looping; False = stop now

    def on_close(self):
        ...                 # one-time teardown

Attributes & helpers

Attribute / method Meaning
loop_mode default seconds between cycles; 0 = run the body once. Overridden by --loop-mode.
max_running_time if >0, the loop self-terminates after this many seconds (also --max-running-time).
stats_interval seconds between periodic stats uploads during the loop (default 120).
on_start() / on_close() one-time hooks around the loop.
base_loop_tasks() optional work run at the start of every cycle (before workflow_loop()).
set_close_reason() / get_close_reason() / is_closed() control / inspect the close reason.

Return False from workflow_loop() (or call code that sets workflow_loop_enabled = False) for an immediate stop; for continuous operation, return True and run with a non-zero loop mode.

BaseLoopScriptAsyncMixin

A mixin for loop scripts whose cycle is asyncio-based — e.g. a crawl manager that schedules many jobs concurrently per cycle. Mix it in front of BaseLoopScript and make workflow_loop a coroutine:

import asyncio
from shub_workflow.script import BaseLoopScript, BaseLoopScriptAsyncMixin

class MyAsyncLoop(BaseLoopScriptAsyncMixin, BaseLoopScript):
    loop_mode = 30

    async def workflow_loop(self) -> bool:
        jobkey = await self.async_schedule_spider("example_spider")
        ...
        return True

It overrides run() to be a coroutine that drives the loop with await asyncio.sleep(...) between cycles, and provides async scheduling: async_schedule_spider(...) / _async_schedule_job(...) (which run the blocking SC client calls in an executor). async_add_job_tags() is likewise available on BaseScript. Because run() is now async, the entry point must launch it with asyncio.run(...) (see below). The synchronous BaseScript/BaseLoopScript machinery (job queries, tags, stats) is still available and used inside the async loop.

Writing and running a script

A complete minimal script and its entry point:

import logging
from shub_workflow.script import BaseScript


class MyScript(BaseScript):

    @property
    def description(self):
        return "Schedule example_spider once."

    def add_argparser_options(self):
        super().add_argparser_options()
        self.argparser.add_argument("--units", type=int, default=1)

    def run(self):
        jobkey = self.schedule_spider("example_spider", units=self.args.units)
        self.stats.inc_value("scheduled")
        logging.getLogger(__name__).info("Scheduled %s", jobkey)


if __name__ == "__main__":
    import logging
    from shub_workflow.utils import get_kumo_loglevel

    logging.basicConfig(format="%(asctime)s %(name)s [%(levelname)s]: %(message)s", level=get_kumo_loglevel())
    script = MyScript()
    script.run()

The __main__ block is boilerplate shared by all shub-workflow scripts: configure logging (get_kumo_loglevel() honours the Scrapy Cloud log level), instantiate, and call run(). For an async (BaseLoopScriptAsyncMixin) script the last line is asyncio.run(script.run()) instead.

Running locally vs. on Scrapy Cloud. Run locally with python my_script.py --project-id=<id-or-keyword> ... (the project id is required unless you set project_required = False). To deploy and schedule it on Scrapy Cloud, the script must be packaged with the project (registered in the project's setup.py) and is then invoked as py:my_script.py; deployment itself is out of scope here.


Next Chapter: Appendix C: Workflow Manager

Clone this wiki locally