diff --git a/resources/views/dashboard.blade.php b/resources/views/dashboard.blade.php index 8509647b..f4d77079 100644 --- a/resources/views/dashboard.blade.php +++ b/resources/views/dashboard.blade.php @@ -38,4 +38,6 @@ + + diff --git a/src/Livewire/QueueMonitor.php b/src/Livewire/QueueMonitor.php new file mode 100644 index 00000000..4abbb527 --- /dev/null +++ b/src/Livewire/QueueMonitor.php @@ -0,0 +1,28 @@ +remember(fn ($interval) => $query($interval, $this->queue, $this->connection))); + } +} diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index f398857c..1f98636e 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -68,6 +68,7 @@ public function register(): void Livewire\Exceptions::class => [Queries\Exceptions::class], Livewire\SlowRoutes::class => [Queries\SlowRoutes::class], Livewire\SlowQueries::class => [Queries\SlowQueries::class], + Livewire\QueueMonitor::class => [Queries\QueueMonitor::class], Livewire\SlowOutgoingRequests::class => [Queries\SlowOutgoingRequests::class], Livewire\Cache::class => [Queries\CacheInteractions::class, Queries\MonitoredCacheInteractions::class], ] as $card => $queries) { @@ -224,6 +225,7 @@ protected function registerComponents(): void $livewire->component('pulse.queues', Livewire\Queues::class); $livewire->component('pulse.servers', Livewire\Servers::class); $livewire->component('pulse.slow-jobs', Livewire\SlowJobs::class); + $livewire->component('queue-monitor', Livewire\QueueMonitor::class); $livewire->component('pulse.exceptions', Livewire\Exceptions::class); $livewire->component('pulse.slow-routes', Livewire\SlowRoutes::class); $livewire->component('pulse.slow-queries', Livewire\SlowQueries::class); diff --git a/src/Queries/QueueMonitor.php b/src/Queries/QueueMonitor.php new file mode 100644 index 00000000..960ad34b --- /dev/null +++ b/src/Queries/QueueMonitor.php @@ -0,0 +1,103 @@ +config->get('queue.default'); + + $maxDataPoints = 60; + + $currentBucket = CarbonImmutable::createFromTimestamp( + floor($now->getTimestamp() / ($interval->totalSeconds / $maxDataPoints)) * ($interval->totalSeconds / $maxDataPoints) + ); + + $secondsPerPeriod = (int) ($interval->totalSeconds / $maxDataPoints); + + $padding = collect([]) + ->pad(60, null) + ->map(fn ($value, $i) => (object) [ + 'date' => $currentBucket->subSeconds($i * $secondsPerPeriod)->format('Y-m-d H:i'), + 'size' => null, + 'failed' => null, + ]) + ->reverse() + ->keyBy('date'); + + return $this->connection() + ->query() + ->selectRaw('bucket, MAX(`size`) AS `size`, MAX(`failed`) AS `failed`') + ->fromSub( + fn ($query) => $query + ->from('pulse_queue_sizes') + ->selectRaw('size, failed, date, FLOOR(UNIX_TIMESTAMP(CONVERT_TZ(`date`, ?, @@session.time_zone)) / ?) AS `bucket`', [$now->format('P'), $secondsPerPeriod]) + ->where('date', '>=', $now->ceilSeconds($interval->totalSeconds / $maxDataPoints)->subSeconds((int) $interval->totalSeconds)) + ->where('queue', $queue) + ->when($connection, fn ($query) => $query->where('connection', $connection)), + 'grouped' + ) + ->groupBy('bucket') + ->orderByDesc('bucket') + ->get() + ->reverse() + ->pipe(function ($readings) use ($secondsPerPeriod, $padding) { + $readings = $readings->keyBy(fn ($reading) => CarbonImmutable::createFromTimestamp($reading->bucket * $secondsPerPeriod)->format('Y-m-d H:i')); + + $readings = $padding->merge($readings)->values(); + + $previousFailed = 0; + + return $readings->each(function ($reading) use (&$previousFailed) { + if ($reading->failed === null) { + $previousFailed = 0; + + return; + } + + if ($previousFailed === 0 || $previousFailed > $reading->failed) { + $previousFailed = $reading->failed; + + $reading->failed = null; + + return; + } + + with($reading->failed, function ($newPreviousFailed) use ($reading, &$previousFailed) { + $reading->failed = $reading->failed - $previousFailed; + + $previousFailed = $newPreviousFailed; + }); + }); + }) + ->pipe(fn ($readings) => (object) [ + 'readings' => $readings->map(fn ($reading) => (object) [ + 'size' => $reading->size, + 'failed' => $reading->failed, + ])->all(), + ]); + } +}