Skip to content

wirecodex/SimpleQueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SimpleQueue

Alpha — v0.1.0. This module is in early testing. The API may change before a stable release. Feedback and bug reports are welcome.

A background job queue for ProcessWire. Push tasks onto the queue from any template or hook, and let LazyCron process them automatically in the background — with priority levels, delayed execution, automatic retry with exponential backoff, and your choice of file or database storage.

Can be installed standalone or as part of the SimpleWire suite.

Features

  • Two Storage Drivers: File-based JSON (zero config) or MySQL database (high throughput)
  • Priority Levels: high, normal, low — processed highest-priority-first, oldest-first within the same level
  • Delayed Execution: Schedule jobs for a specific future time via delay (seconds) or at (datetime string)
  • Auto Retry: Exponential backoff on failure — retries at 2 min, 4 min, 8 min… up to 1 hour intervals
  • Failure Tracking: Failed jobs are preserved with their error message and attempt count for inspection and manual retry
  • LazyCron Integration: Automatic background processing on a configurable interval (every 30s to hourly)
  • Pause / Resume: Persistently halt and resume processing across requests without dropping jobs
  • Auto-Prune: Automatic cleanup of old completed jobs on a daily schedule
  • Zero Dependencies: No external libraries, no Redis, no system queues required

Installation

  1. Copy the SimpleQueue folder to /site/modules/
  2. Go to Modules → Refresh in the ProcessWire admin
  3. Install SimpleQueue

On install, the module creates:

  • /site/jobs/ — where your job handler files live
  • /site/assets/cache/SimpleWire/Queue/ — file driver storage (four sub-directories: pending, processing, completed, failed)

On uninstall, the cache directory and database table (if using DatabaseDriver) are removed automatically.

Requirements: ProcessWire 3.0.200+, PHP 8.1+


Quick Access

// Via the shorthand function (recommended)
$queue = queue();

// Via the explicit module function
$queue = simplequeue();

// Via the ProcessWire API variable
$queue = wire()->simplequeue;

How It Works

  1. Your application code calls queue()->push(...) to add a job
  2. The job is persisted to storage (file or database) with its payload, priority, and schedule
  3. LazyCron fires on the configured interval and calls queue()->process()
  4. The Worker picks up ready pending jobs (highest priority, oldest first), loads the handler file from /site/jobs/, and calls handle($data)
  5. Successful jobs are moved to completed; failures trigger retry with exponential backoff; jobs that exhaust their retry budget are moved to failed

Job Handlers

A job handler is a PHP file in /site/jobs/. It must define a class with a handle(array $data) method. Use the ProcessWire namespace so you can call wire() directly.

<?php
// /site/jobs/SendWelcomeEmailJob.php
namespace ProcessWire;

class SendWelcomeEmailJob
{
    public function handle(array $data): void
    {
        $mail = wireMail();
        $mail->to($data['to']);
        $mail->subject($data['subject']);
        $mail->body($data['body'] ?? '');
        $mail->send();

        wire('log')->save('emails', "Sent to {$data['to']}");
    }
}

Rules:

  • File name must match class name exactly: SendWelcomeEmailJob.phpclass SendWelcomeEmailJob
  • The handle() method receives the $data array you passed to push()
  • Throw an exception to signal failure — the Worker catches it and triggers retry logic
  • Handlers without namespace are also supported (the Worker tries ProcessWire\ClassName first, then bare ClassName)

Pushing Jobs

$jobId = queue()->push(string $handler, array $data = [], array $options = []);

Returns the job ID string on success, false on failure.

Basic Push

queue()->push('SendWelcomeEmailJob', [
    'to'      => 'jane@example.com',
    'subject' => 'Welcome to the site!',
    'body'    => 'Thanks for signing up.',
]);

With Options

queue()->push('SendWelcomeEmailJob', $data, [
    'priority'     => 'high',              // 'high' | 'normal' | 'low'
    'delay'        => 300,                 // seconds from now
    'at'           => '2026-06-01 09:00',  // specific datetime (overrides delay)
    'max_attempts' => 5,                   // retry limit (default: 3)
]);
Option Type Default Description
priority string 'normal' 'high', 'normal', or 'low'
delay int 0 Seconds to wait before the job becomes ready
at int|string Unix timestamp or date string (e.g. '2026-06-01 09:00')
max_attempts int 3 How many times to attempt before marking as failed

Priority Constants

use SimpleWire\Queue\Job;

queue()->push('MyJob', $data, ['priority' => Job::PRIORITY_HIGH]);
queue()->push('MyJob', $data, ['priority' => Job::PRIORITY_NORMAL]);
queue()->push('MyJob', $data, ['priority' => Job::PRIORITY_LOW]);

Processing

Jobs are processed automatically via LazyCron. You can also trigger processing manually:

$results = queue()->process();     // up to 10 jobs (default)
$results = queue()->process(50);   // up to 50 jobs

process() returns:

[
    'processed' => 10,
    'succeeded' => 9,
    'failed'    => 1,
    'errors'    => [
        ['job_id' => 'job_...', 'handler' => 'SendWelcomeEmailJob', 'error' => 'Connection refused'],
    ],
]

Retry Logic

When a handler throws an exception, the Worker:

  1. Increments the attempt counter on the job
  2. If attempts < max_attempts — reschedules with exponential backoff: min(3600, 2^attempts × 60) seconds
  3. If attempts >= max_attempts — marks the job as failed permanently

Backoff schedule (default 3 attempts):

Attempt Delay before retry
1 2 minutes
2 4 minutes
3 — permanently failed

Inspecting the Queue

// Counts by status
$stats = queue()->stats();
// ['pending' => 5, 'processing' => 1, 'completed' => 142, 'failed' => 2, 'total' => 150]

// Jobs by status
$pending    = queue()->pending(20);    // newest first
$processing = queue()->processing();
$completed  = queue()->completed(50);
$failed     = queue()->failed(10);

// Specific job
$job = queue()->get($jobId);

Job Properties

Every method that returns jobs gives you Job objects. Available properties:

Property Type Description
id string Unique job ID (e.g. job_67f6d8a12345.12)
handler string Handler class name
data array Payload passed to handle()
priority string high, normal, or low
status string pending, processing, completed, or failed
attempts int Number of execution attempts so far
max_attempts int Maximum allowed attempts
created_at int Unix timestamp when the job was pushed
execute_at int Unix timestamp when the job becomes ready
processed_at int|null Unix timestamp of last processing attempt
last_error string|null Error message from the last failure

Helper methods:

$job->isReady();          // bool — execute_at <= now
$job->isDelayed();        // bool — execute_at > now
$job->canRetry();         // bool — attempts < max_attempts
$job->getDelay();         // int  — seconds until execution (0 if ready)
$job->getAge();           // int  — seconds since creation
$job->getPriorityLabel(); // string — 'High', 'Normal', 'Low'
$job->getStatusLabel();   // string — 'Pending', 'Processing', etc.

Managing Jobs

// Retry a failed job (resets attempts to 0, clears last_error)
queue()->retry($jobId);

// Delete a specific job (any status)
queue()->delete($jobId);

// Clear all jobs of a given status
queue()->clear('failed');
queue()->clear('completed');

// Prune completed jobs older than 7 days (default)
queue()->prune();

// Prune completed jobs older than 24 hours
queue()->prune(86400);

Pause and Resume

Pause state persists across requests via ProcessWire's cache. A paused queue stays paused until explicitly resumed — LazyCron ticks are also blocked.

queue()->pause();
queue()->isPaused(); // true — persists across requests

// Later, in another request or from the admin
queue()->resume();
queue()->isPaused(); // false

Useful during deployments, maintenance windows, or when debugging a misbehaving job handler.


Module Configuration

Navigate to Admin → Modules → Site → SimpleQueue → Configure.

Driver & Throughput

Setting Default Description
Queue Driver FileDriver FileDriver (JSON files) or DatabaseDriver (MySQL)
Jobs Per Run 10 Maximum jobs processed per LazyCron tick

LazyCron Settings

Setting Default Description
Process via LazyCron true Automatically process the queue on page loads
LazyCron Interval everyMinute How often to process: 30s, 1m, 2m, 5m, 15m, 30m, 1h

Pruning

Setting Default Description
Auto-Prune Completed Jobs true Daily cleanup of old completed jobs
Prune After (seconds) 604800 (7 days) Completed jobs older than this are removed

Storage Drivers

FileDriver (default)

Stores each job as a JSON file, organised into four sub-directories:

/site/assets/cache/SimpleWire/Queue/
  pending/       ← jobs waiting for execution
  processing/    ← jobs currently being executed
  completed/     ← successfully completed jobs
  failed/        ← permanently failed jobs
  • Zero database setup
  • Human-readable files — easy to inspect and debug
  • Sorted by priority weight (desc) then created_at (asc) when fetching next jobs
  • Best for: low-to-medium throughput, single-server setups

DatabaseDriver

Stores jobs in a simple_queue_jobs table, created automatically on first use.

  • Atomic UPDATE … WHERE status = 'pending' for safe concurrent locking
  • Indexed on status, execute_at, priority for efficient polling
  • Best for: high-throughput, multiple workers, or environments where files are impractical

Switching drivers: existing jobs in the old storage are not migrated. Clear the old queue before switching, or let it drain naturally.

Direct driver access:

$driver = queue()->driver; // QueueDriver instance

Complete Examples

Email Notification System

// /site/jobs/SendWelcomeEmailJob.php
<?php namespace ProcessWire;

class SendWelcomeEmailJob
{
    public function handle(array $data): void
    {
        $mail = wireMail();
        $mail->to($data['to']);
        $mail->subject($data['subject']);
        $mail->body($data['body'] ?? '');
        if (!empty($data['from'])) $mail->from($data['from']);

        if (!$mail->send()) {
            throw new \Exception("Failed to send email to {$data['to']}");
        }
    }
}
// Queue from a form handler or hook
queue()->push('SendWelcomeEmailJob', [
    'to'      => $newUser->email,
    'subject' => 'Welcome to the site!',
    'body'    => "Hi {$newUser->name}, thanks for joining.",
], ['priority' => 'high']);

Image Processing

// /site/jobs/GenerateThumbnailsJob.php
<?php namespace ProcessWire;

class GenerateThumbnailsJob
{
    public function handle(array $data): void
    {
        $page  = wire('pages')->get($data['page_id']);
        $image = $page->images->get($data['image_name']);

        if (!$image) {
            throw new \Exception("Image not found: {$data['image_name']}");
        }

        $image->size(1200, 800);
        $image->size(600, 400);
        $image->size(200, 150);
    }
}
// Queue after a page save hook
wire()->addHookAfter('Pages::saved', function($event) {
    $page = $event->arguments(0);
    if ($page->template != 'article') return;
    if (!$page->images->count()) return;

    queue()->push('GenerateThumbnailsJob', [
        'page_id'    => $page->id,
        'image_name' => $page->images->first()->basename,
    ], ['delay' => 5]);
});

Scheduled Report Generation

// /site/jobs/GenerateReportJob.php
<?php namespace ProcessWire;

class GenerateReportJob
{
    public function handle(array $data): void
    {
        $start = $data['period_start'];
        $end   = $data['period_end'];

        $orders = wire('pages')->find("template=order, created>=$start, created<=$end");
        $total  = 0;
        foreach ($orders as $order) $total += $order->total;

        // Store the result
        $report = wire('pages')->add('report', '/reports/', [
            'title'  => "Report {$data['label']}",
            'period' => "$start$end",
            'total'  => $total,
            'count'  => $orders->count,
        ]);
    }
}
// Schedule for first of next month at 02:00
queue()->push('GenerateReportJob', [
    'label'        => 'May 2026',
    'period_start' => '2026-05-01',
    'period_end'   => '2026-05-31',
], [
    'at'           => '2026-06-01 02:00:00',
    'max_attempts' => 2,
]);

Inspecting and Retrying Failed Jobs

$failed = queue()->failed(50);

foreach ($failed as $job) {
    echo "Job:      {$job->id}\n";
    echo "Handler:  {$job->handler}\n";
    echo "Attempts: {$job->attempts}/{$job->max_attempts}\n";
    echo "Error:    {$job->last_error}\n";
    echo "Failed:   " . date('Y-m-d H:i:s', $job->processed_at) . "\n\n";
}

// Retry all failed email jobs
foreach ($failed as $job) {
    if ($job->handler === 'SendWelcomeEmailJob') {
        queue()->retry($job->id);
    }
}

// Or clear them all
queue()->clear('failed');

Dashboard Stats

$stats = queue()->stats();

echo "Pending:    {$stats['pending']}\n";
echo "Processing: {$stats['processing']}\n";
echo "Completed:  {$stats['completed']}\n";
echo "Failed:     {$stats['failed']}\n";
echo "Total:      {$stats['total']}\n";

Manual Processing from a Template

// /site/templates/admin-queue.php
// Useful for cron-triggered processing outside of LazyCron
namespace ProcessWire;

$results = queue()->process(100);
header('Content-Type: application/json');
echo json_encode($results);

API Reference

Queue

Method Returns Description
push(string $handler, array $data, array $options) string|false Push a job. Returns job ID or false
process(int $limit = 10) array Process up to $limit pending jobs
get(string $id) Job|null Get a job by ID
delete(string $id) bool Delete a job by ID
retry(string $id) bool Reset a failed job to pending
pending(int $limit = 100) Job[] Get pending jobs
processing(int $limit = 100) Job[] Get in-progress jobs
completed(int $limit = 100) Job[] Get completed jobs
failed(int $limit = 100) Job[] Get failed jobs
stats() array Counts by status + total
clear(string $status) int Delete all jobs of a given status; returns count
prune(int $olderThan = 604800) int Delete old completed jobs; returns count
pause() void Halt processing for the current request
resume() void Re-enable processing for the current request
isPaused() bool Whether the queue is currently paused

Global Functions

Function Returns Description
queue() Queue Get the Queue instance
simplequeue() Queue Alias for queue()

Job Status Constants

Job::STATUS_PENDING    // 'pending'
Job::STATUS_PROCESSING // 'processing'
Job::STATUS_COMPLETED  // 'completed'
Job::STATUS_FAILED     // 'failed'

Job Priority Constants

Job::PRIORITY_HIGH   // 'high'
Job::PRIORITY_NORMAL // 'normal'
Job::PRIORITY_LOW    // 'low'

Troubleshooting

Jobs are not processing

  • Confirm Process via LazyCron is enabled in module settings
  • LazyCron requires page loads to fire — a site with no traffic won't process the queue automatically. For guaranteed processing, set up a real cron job that calls a processing template
  • Check the queue log in ProcessWire Admin → Logs for errors

Handler not found

  • Verify the file exists at /site/jobs/YourHandlerName.php
  • Confirm the class name matches the file name exactly (case-sensitive on Linux)
  • The class must be in the ProcessWire namespace or the global namespace

Jobs keep failing with the same error

  • Inspect the failed job: queue()->get($jobId)->last_error
  • Fix the underlying issue in your handler before retrying: queue()->retry($jobId)
  • Check the queue log for the full error history

Database table already exists error

  • If upgrading from an earlier version, the table may already exist — the CREATE TABLE IF NOT EXISTS in the current version handles this safely

Debugging

// Check queue state
$stats = queue()->stats();
bd($stats); // ProcessWire debug bar

// Inspect a specific job
$job = queue()->get('job_...');
bd($job->toArray());

// Log from a handler for debugging
wire('log')->save('queue-debug', json_encode($data));

License

This module is released under the MIT License.

About

Background job queue with priority, delayed execution, retry, and LazyCron-based processing

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages