Enterprise-grade, queue-first background file processing for Laravel.
- Requirements
- Installation
- Configuration
- Quick Start
- File Type Processors
- Queue & Priority
- Batch Processing
- Job Chaining
- Retry Policies
- Status Tracking
- Real-time Progress (Broadcasting)
- Notifications
- Multi-Tenancy
- Custom Processors
- Artisan Commands
- API Endpoints
- Monitoring
- Scalability
- Testing
- Architecture
| Requirement | Version |
|---|---|
| PHP | 8.2+ |
| Laravel | 10 / 11 / 12 / 13 |
| Database | MySQL 8+ / PostgreSQL 14+ |
| Queue | Redis (recommended) / Database / SQS |
| Optional | |
| FFmpeg | Video / Audio processing |
| LibreOffice | Word / Excel / PowerPoint conversion |
| Imagick | Advanced image processing |
| GhostScript | PDF compression |
| pdftotext | PDF text extraction |
composer require mostafax/background-processing-enginephp artisan bpe:installThis single command will:
- Publish the config file to
config/bpe.php - Publish and run the 5 database migrations
- Display usage examples
# Config only
php artisan vendor:publish --tag=bpe-config
# Migrations only
php artisan vendor:publish --tag=bpe-migrations
php artisan migrateAdd your queue workers to config/horizon.php (recommended) or use:
# Basic worker (development)
php artisan queue:work --queue=bpe-high,bpe-default,bpe-low,bpe-batch
# With Horizon (production)
php artisan horizonAfter publishing, edit config/bpe.php:
return [
'queue' => [
'prefix' => env('BPE_QUEUE_PREFIX', 'bpe'),
'topology' => [
'high' => env('BPE_QUEUE_HIGH', 'bpe-high'),
'default' => env('BPE_QUEUE_DEFAULT', 'bpe-default'),
'low' => env('BPE_QUEUE_LOW', 'bpe-low'),
'batch' => env('BPE_QUEUE_BATCH', 'bpe-batch'),
],
],
'storage' => [
'default' => env('BPE_STORAGE_DISK', 'local'), // s3, minio, local, ftp
'temp' => env('BPE_TEMP_DIR', sys_get_temp_dir()),
],
'drivers' => [
'ffmpeg' => [
'binaries' => [
'ffmpeg' => env('BPE_FFMPEG_PATH', 'ffmpeg'),
'ffprobe' => env('BPE_FFPROBE_PATH', 'ffprobe'),
],
],
'libreoffice' => [
'binary' => env('BPE_LIBREOFFICE_PATH', 'libreoffice'),
],
],
'cache' => [
'enabled' => true,
'store' => env('BPE_CACHE_STORE', 'redis'), // redis, file, array
'ttl' => 300, // seconds
],
'retry' => [
'default_attempts' => 3,
'default_backoff' => [30, 60, 120], // seconds
],
'pruning' => [
'enabled' => true,
'keep_days' => 30,
],
];Add these to your .env:
# Storage
BPE_STORAGE_DISK=local # local | s3 | minio
# Queue
BPE_QUEUE_PREFIX=bpe
BPE_CACHE_STORE=file # redis | file | array
# Drivers (optional — only needed for specific file types)
BPE_FFMPEG_PATH=/usr/bin/ffmpeg
BPE_LIBREOFFICE_PATH=/usr/bin/libreoffice
# Security
BPE_MAX_FILE_SIZE=524288000 # 500 MB in bytesAdd the facade alias (auto-registered via Laravel auto-discovery):
use Mostafax\BPE\Support\Facades\BPE as FileProcessor;Or use the helper function:
bpe($file)->dispatch();
bpe()->image($file)->resize(800, 600)->dispatch();$result = FileProcessor::image($request->file('photo'))
->resize(1920, 1080)
->compress(85)
->dispatch();
// $result->taskId — UUID to track the job
// $result->status — 'dispatched'
// $result->queueNameSupported MIME types: image/jpeg, image/png, image/webp, image/gif, image/bmp, image/tiff, image/avif
Requires: GD (built-in) or Imagick extension
FileProcessor::image($file)
->resize(width: 1920, height: 1080, mode: 'fit') // mode: fit | fill | force
->compress(quality: 85) // 1-100
->convert('webp') // jpg | png | webp | gif
->watermark('/path/to/logo.png', position: 'bottom-right', opacity: 0.5)
->crop(width: 800, height: 600, x: 0, y: 0)
->optimize() // strip metadata + compress
->queue('bpe-high')
->disk('s3')
->dispatch();Supported MIME types: video/mp4, video/avi, video/quicktime, video/webm, video/3gpp
Requires: FFmpeg
FileProcessor::video($file)
->generateThumbnail(at: '00:00:05', width: 640) // extract frame as image
->compress(crf: 23, preset: 'medium') // H.264, CRF 0-51
->convert('mp4', codec: 'h264') // mp4 | webm | avi
->extractAudio(format: 'mp3') // strip audio track
->trim(start: '00:00:10', end: '00:02:00') // cut video segment
->watermark('/path/to/logo.png')
->queue('bpe-default')
->dispatch();Supported MIME types: application/pdf
Requires: GhostScript (compress), pdftotext (extract text), pdftk (split/merge), Imagick (toImage)
FileProcessor::pdf($file)
->compress() // reduce file size via GhostScript
->extractText() // → .txt file
->split(pages: [1, 3, 5]) // extract specific pages
->merge([$pdf2, $pdf3]) // combine PDFs
->watermark(text: 'CONFIDENTIAL', opacity: 0.3)
->toImage(format: 'jpg', density: 150) // first page → image
->queue('bpe-default')
->dispatch();Supported MIME types: application/msword, application/vnd.openxmlformats-officedocument.wordprocessingml.document
Requires: LibreOffice
FileProcessor::word($file)
->convertToPDF() // .docx → .pdf
->convertToHTML() // .docx → .html
->extractText() // .docx → .txt
->queue('bpe-default')
->dispatch();Supported MIME types: application/vnd.ms-excel, application/vnd.openxmlformats-officedocument.spreadsheetml.sheet
Requires: LibreOffice
FileProcessor::excel($file)
->convertToCSV() // .xlsx → .csv
->convertToPDF() // .xlsx → .pdf
->parseData() // .xlsx → .json (structured rows)
->queue('bpe-batch')
->dispatch();Supported MIME types: text/csv, text/plain, application/csv
FileProcessor::csv($file)
->validate([
'columns' => ['name', 'email', 'age'], // required columns
'max_rows' => 100000,
])
->transform([
'map' => ['name' => 'full_name', 'email' => 'email_address'],
])
->import([
'table' => 'users', // import directly to DB table
'chunk_size' => 500,
])
->queue('bpe-batch')
->dispatch();BPE uses 4 priority queues. Workers poll them in weight order.
bpe-high (weight: 10) → user-facing, SLA-bound
bpe-default (weight: 5) → standard async tasks
bpe-low (weight: 2) → non-urgent
bpe-batch (weight: 1) → nightly / bulk jobs
use Mostafax\BPE\Domain\Processing\ValueObjects\Priority;
FileProcessor::image($file)
->priority(Priority::HIGH) // sets queue automatically
// OR manually:
->queue('bpe-high')
->dispatch();Horizon configuration:
// config/horizon.php
'environments' => [
'production' => [
'bpe-high-supervisor' => [
'connection' => 'redis',
'queue' => ['bpe-high'],
'balance' => 'auto',
'minProcesses' => 5,
'maxProcesses' => 30,
'timeout' => 3600,
],
'bpe-batch-supervisor' => [
'connection' => 'redis',
'queue' => ['bpe-batch'],
'processes' => 3,
'timeout' => 7200,
],
],
],Process multiple files in one call:
$files = $request->file('files'); // array of UploadedFile
$tasks = collect($files)->map(function ($file, $index) {
return FileProcessor::process($file)
->queue('bpe-batch')
->priority(Priority::BATCH)
->withMetadata(['batch_index' => $index])
->dispatch();
});
// Each $task has a unique taskId you can track independentlyRun processors sequentially — each step waits for the previous:
// Compress video, then generate thumbnail from compressed version
FileProcessor::video($video)
->compress(crf: 23)
->queue('bpe-default')
->dispatch();
// Then in the TaskCompleted listener, dispatch the next step:
Event::listen(TaskCompleted::class, function ($event) {
$result = $event->result;
FileProcessor::image(FileReference::fromPath($result['thumbnail_path']))
->resize(640, 360)
->dispatch();
});use Mostafax\BPE\Domain\Processing\ValueObjects\RetryPolicy;
FileProcessor::video($file)
// Custom: 5 attempts, increasing backoff
->retry(attempts: 5, backoff: [10, 30, 60, 120, 300])
->dispatch();Built-in policies:
RetryPolicy::default() // 3 attempts, [30, 60, 120] seconds
RetryPolicy::aggressive() // 5 attempts, [10, 30, 60, 120, 300] seconds
RetryPolicy::noRetry() // 1 attempt, fail immediately// Facade
$status = FileProcessor::status($taskId);
// Returns: ProcessingTaskDTO
echo $status->status; // 'pending' | 'processing' | 'completed' | 'failed'
echo $status->progress; // 0-100
echo $status->errorMessage; // null | string
print_r($status->result); // ['path' => '...', 'disk' => '...', ...]
// Via HTTP API
GET /api/bpe/tasks/{taskId}
// Response:
{
"data": {
"uuid": "abc-123...",
"status": "completed",
"progress": 100,
"file_type": "image",
"queue_name": "bpe-high",
"result": { "path": "bpe-results/2024/01/01/abc.webp", "disk": "s3" },
"started_at": "2024-01-01T12:00:00Z",
"completed_at": "2024-01-01T12:00:03Z"
}
}The package automatically broadcasts events on the public channel bpe.task.{taskId}.
Laravel Echo (frontend):
import Echo from 'laravel-echo';
import Pusher from 'pusher-js';
window.Pusher = Pusher;
window.Echo = new Echo({ broadcaster: 'pusher', key: process.env.MIX_PUSHER_APP_KEY });
const taskId = 'abc-123-...';
Echo.channel(`bpe.task.${taskId}`)
.listen('.task.started', (e) => {
console.log('Started:', e);
})
.listen('.task.progress', (e) => {
progressBar.style.width = `${e.progress}%`;
label.textContent = `${e.step} — ${e.progress}%`;
})
.listen('.task.completed', (e) => {
showResult(e.result);
})
.listen('.task.failed', (e) => {
showError(e.message);
});Broadcast event payloads:
| Event | Payload |
|---|---|
task.started |
{ task_id, status: 'processing' } |
task.progress |
{ task_id, progress: 0-100, step: 'compress', status } |
task.completed |
{ task_id, status: 'completed', result: {...} } |
task.failed |
{ task_id, status: 'failed', message: '...' } |
FileProcessor::image($file)
->notify(channels: ['mail', 'slack', 'webhook'])
->dispatch();Configure notification channels in config/bpe.php:
'notifications' => [
'channels' => ['mail'],
'mail' => ['from' => 'noreply@yourapp.com'],
'slack' => ['webhook' => env('SLACK_WEBHOOK_URL')],
'webhook' => ['url' => env('BPE_WEBHOOK_URL')],
],All tasks are automatically scoped by tenant_id:
// Scope to a specific tenant
FileProcessor::forTenant($tenantId)
->image($file)
->resize(1920, 1080)
->dispatch();
// Query scoped to tenant
ProcessingTask::forTenant($tenantId)->where('status', 'completed')->get();Add the HasProcessingTasks trait to your Tenant or User model:
use Mostafax\BPE\Support\Traits\HasProcessingTasks;
class User extends Authenticatable
{
use HasProcessingTasks;
}
// Then:
$user->processingTasks()->get();
$user->completedTasks()->count();Register any custom file type:
// 1. Create your processor
namespace App\Processors;
use Mostafax\BPE\Processors\AbstractProcessor;
use Mostafax\BPE\Contracts\ProcessorStepInterface;
use Mostafax\BPE\Application\DTOs\ProcessingResultDTO;
use Mostafax\BPE\Domain\Processing\ValueObjects\FileReference;
class DicomProcessor extends AbstractProcessor
{
public function supportedMimeTypes(): array
{
return ['application/dicom'];
}
public function fileType(): string { return 'dicom'; }
protected function resolveStep(string $name): ProcessorStepInterface
{
return match ($name) {
'anonymize' => new AnonymizeDicomStep(),
'convert' => new ConvertDicomStep(),
};
}
protected function buildResult(FileReference $originalFile, string $outputPath): ProcessingResultDTO
{
return new ProcessingResultDTO(
path: $outputPath,
disk: $originalFile->disk(),
mimeType: 'image/jpeg',
size: filesize($outputPath),
);
}
}// 2. Register in AppServiceProvider
use Mostafax\BPE\Support\Facades\BPE as FileProcessor;
public function boot(): void
{
FileProcessor::extend('dicom', \App\Processors\DicomProcessor::class);
}// 3. Use it
FileProcessor::dicom($file)
->anonymize()
->convert('jpg')
->dispatch();# Install the package (publish + migrate)
php artisan bpe:install
# Show processing statistics
php artisan bpe:status
# Retry failed tasks
php artisan bpe:retry {uuid}
php artisan bpe:retry --all-failed
# Prune old tasks
php artisan bpe:prune --days=30
# Run demo (no real file needed)
php artisan bpe:demo --type=image
php artisan bpe:demo --type=csv
php artisan bpe:demo --queue
php artisan bpe:demo --statsThe package registers these routes under api/bpe/*:
| Method | Endpoint | Description |
|---|---|---|
GET |
/api/bpe/tasks/{id} |
Get task status |
POST |
/api/bpe/tasks/{id}/cancel |
Cancel task |
POST |
/api/bpe/tasks/{id}/retry |
Retry failed task |
GET |
/api/bpe/health |
Health check |
Control which routes are enabled in config/bpe.php:
'api' => [
'enabled' => true,
'prefix' => 'api/bpe',
'middleware' => ['api', 'auth:sanctum'],
],Example HTTP responses:
BPE integrates with Horizon out of the box. No extra setup needed. All jobs appear under their respective queues in the Horizon dashboard.
php artisan bpe:statusBPE Processing Statistics
┌─────────────┬───────┐
│ Status │ Count │
├─────────────┼───────┤
│ pending │ 12 │
│ dispatched │ 3 │
│ processing │ 2 │
│ completed │ 891 │
│ failed │ 4 │
│ cancelled │ 1 │
└─────────────┴───────┘
All processing data is persisted in 5 tables:
| Table | Purpose |
|---|---|
bpe_tasks |
Main task record — status, pipeline, retry info |
bpe_results |
Output files — URLs, dimensions, metadata |
bpe_logs |
Per-task log entries with stage tracking |
bpe_events |
Immutable event store (audit trail) |
bpe_metrics |
Time-series performance metrics |
- 1 Redis instance + 5 Horizon workers
- Single MySQL + BPE with default config
- Redis Sentinel (2 replicas)
- 20-30 Horizon workers across 3-5 servers
- MySQL + Read Replica
- Redis Cluster (6 shards)
- 100+ Kubernetes workers (HPA on queue depth)
- PostgreSQL + Citus horizontal sharding
Use the built-in fake:
use Mostafax\BPE\Support\Facades\BPE as FileProcessor;
// In your test setUp()
FileProcessor::fake(); // disables real processing; queues are captured
// Act
$result = FileProcessor::image($fakeFile)->resize(800, 600)->dispatch();
// Assert
$this->assertDatabaseHas('bpe_tasks', [
'uuid' => $result->taskId,
'status' => 'dispatched',
'type' => 'image',
]);BPE follows Clean Architecture + DDD with 8 Bounded Contexts:
Domain Layer (ProcessingTask aggregate, Value Objects, Domain Events)
↑ depends on nothing external
Application Layer (PendingProcess builder, Commands, Queries, DTOs)
↑ depends on Domain
Infrastructure Layer (Eloquent Repositories, Queue, Storage, Drivers)
↑ depends on Application contracts
Interface Layer (HTTP Controllers, Artisan Commands, Broadcast Events)
↑ depends on Application
Key design patterns:
- Aggregate Root —
ProcessingTaskowns all state transitions - Value Objects —
Priority,RetryPolicy,ProcessingPipelineare immutable - Repository Pattern —
CachedTaskRepositorywrapsEloquentTaskRepository - Strategy Pattern — Processors and Steps are swappable implementations
- Fluent Builder —
PendingProcessaccumulates configuration before dispatching - Event Sourcing — All state changes recorded in
bpe_events
MIT © Mostafa — mostafa.m.elbiar2@gmail.com