From da5201632e2e1bd66c6803994d7125c76599130d Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Fri, 11 Aug 2023 17:02:51 +1000 Subject: [PATCH 01/26] wip --- config/pulse.php | 19 +++++++++-- src/Commands/CheckCommand.php | 2 +- src/Commands/RestartCommand.php | 2 +- src/Commands/WorkCommand.php | 9 +++--- src/Contracts/Ingest.php | 13 +++++--- src/Contracts/Storage.php | 18 +++++++++++ src/Entries/JobFinished.php | 5 +-- src/Entries/JobStarted.php | 6 ++-- src/Entries/Update.php | 11 ++----- src/Ingests/Database.php | 56 --------------------------------- src/Ingests/Redis.php | 28 +++++------------ src/Ingests/Storage.php | 42 +++++++++++++++++++++++++ src/Pulse.php | 15 ++++----- src/PulseServiceProvider.php | 29 +++++++++++++++-- src/Storage/Database.php | 47 +++++++++++++++++++++++++++ 15 files changed, 191 insertions(+), 111 deletions(-) create mode 100644 src/Contracts/Storage.php delete mode 100644 src/Ingests/Database.php create mode 100644 src/Ingests/Storage.php create mode 100644 src/Storage/Database.php diff --git a/config/pulse.php b/config/pulse.php index 7c760110..cb225672 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -17,8 +17,23 @@ // This must be unique for each reporting server. 'server_name' => env('PULSE_SERVER_NAME', gethostname()), - // 'ingest' => Laravel\Pulse\Ingests\Database::class, - 'ingest' => Laravel\Pulse\Ingests\Redis::class, + 'storage' => [ + 'driver' => env('PULSE_STORAGE_DRIVER', 'database'), + + 'database' => [ + 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', + ], + ], + + 'ingest' => [ + 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + + 'storage' => [], + + 'redis' => [ + 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', + ], + ], // TODO: filter configuration? // TODO: trim lottery configuration diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 786aaa98..7e49890b 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -76,7 +76,7 @@ public function handle(): int ])->toJson(), ]; - // TODO: do this via the injest + // TODO: do this via the ingest DB::table('pulse_servers')->insert($stats); $this->line('[system stats] '.json_encode($stats)); diff --git a/src/Commands/RestartCommand.php b/src/Commands/RestartCommand.php index 19d2e953..afa72ebd 100644 --- a/src/Commands/RestartCommand.php +++ b/src/Commands/RestartCommand.php @@ -5,7 +5,7 @@ use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\InteractsWithTime; -use Laravel\Pulse\Ingests\Database; +use Laravel\Pulse\Ingests\Storage; use Laravel\Pulse\Ingests\Redis; use Symfony\Component\Console\Attribute\AsCommand; diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index c035ae49..9a28452f 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -6,7 +6,8 @@ use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Sleep; -use Laravel\Pulse\Ingests\Database; +use Laravel\Pulse\Contracts\Ingest; +use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Ingests\Redis; use Symfony\Component\Console\Attribute\AsCommand; @@ -30,7 +31,7 @@ class WorkCommand extends Command /** * Handle the command. */ - public function handle(Redis $redisIngest, Database $databaseIngest): int + public function handle(Ingest $ingest, Storage $storage): int { $lastRestart = Cache::get('illuminate:pulse:restart'); @@ -48,12 +49,12 @@ public function handle(Redis $redisIngest, Database $databaseIngest): int if ($now->subMinute()->greaterThan($lastTrimmedDatabaseAt)) { $this->comment('Trimming the database at '.$now->toDateTimeString()); - $databaseIngest->trim($now->subWeek()); + $storage->trim($now->subWeek()); $lastTrimmedDatabaseAt = $now; } - $processed = $redisIngest->processEntries(1000); + $processed = $ingest->store($storage, 1000); if ($processed === 0) { $this->comment('Queue finished processing. Sleeping at '.$now->toDateTimeString()); diff --git a/src/Contracts/Ingest.php b/src/Contracts/Ingest.php index c14750fa..2a29c020 100644 --- a/src/Contracts/Ingest.php +++ b/src/Contracts/Ingest.php @@ -7,12 +7,17 @@ interface Ingest { /** - * Ingest the entries and updates without throwing exceptions. + * Ingest the entries and updates. */ - public function ingestSilently(array $entries, array $updates): void; + public function ingest(array $entries, array $updates): void; /** - * Trim the ingest without throwing exceptions. + * Trim the ingested entries. */ - public function trimSilently(CarbonImmutable $oldest): void; + public function trim(CarbonImmutable $oldest): void; + + /** + * Store the ingested entries. + */ + public function store(Storage $storage, int $count): int; } diff --git a/src/Contracts/Storage.php b/src/Contracts/Storage.php new file mode 100644 index 00000000..593d13ac --- /dev/null +++ b/src/Contracts/Storage.php @@ -0,0 +1,18 @@ +query() + $db->table($this->table()) ->where('job_id', $this->jobId) ->update([ 'duration' => DB::raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), diff --git a/src/Entries/JobStarted.php b/src/Entries/JobStarted.php index 759c77f0..be3675b6 100644 --- a/src/Entries/JobStarted.php +++ b/src/Entries/JobStarted.php @@ -2,6 +2,8 @@ namespace Laravel\Pulse\Entries; +use Illuminate\Database\Connection; + class JobStarted extends Update { /** @@ -17,9 +19,9 @@ public function __construct( /** * Perform the update. */ - public function perform(): void + public function perform(Connection $db): void { - $this->query() + $db->table($this->table()) ->where('job_id', $this->jobId) ->update([ 'processing_started_at' => $this->startedAt, diff --git a/src/Entries/Update.php b/src/Entries/Update.php index 304d146f..4fecdbff 100644 --- a/src/Entries/Update.php +++ b/src/Entries/Update.php @@ -2,6 +2,7 @@ namespace Laravel\Pulse\Entries; +use Illuminate\Database\Connection; use Illuminate\Database\Query\Builder; use Illuminate\Support\Facades\DB; @@ -15,7 +16,7 @@ abstract public function table(): string; /** * Perform the update. */ - abstract public function perform(): void; + abstract public function perform(Connection $db): void; /** * Determine if the update is the given type. @@ -32,12 +33,4 @@ public function type(): Type { return Type::from($this->table()); } - - /** - * The table query. - */ - protected function query(): Builder - { - return DB::table($this->table()); - } } diff --git a/src/Ingests/Database.php b/src/Ingests/Database.php deleted file mode 100644 index 3346638e..00000000 --- a/src/Ingests/Database.php +++ /dev/null @@ -1,56 +0,0 @@ - $this->ingest($entries, $updates), report: false); - } - - /** - * Ingest the entries and updates. - */ - public function ingest(array $entries, array $updates): void - { - if ($entries === [] && $updates === []) { - return; - } - - DB::transaction(function () use ($entries, $updates) { - collect($entries)->each(fn ($rows, $table) => collect($rows) - ->chunk(1000) - ->map->all() - ->each(DB::table($table)->insert(...))); - - collect($updates)->each(fn ($update) => $update->perform()); - }); - } - - /** - * Trim the ingest without throwing exceptions. - */ - public function trimSilently(CarbonImmutable $oldest): void - { - rescue(fn () => $this->trim($oldest), report: false); - } - - /** - * Trim the ingest. - */ - public function trim(CarbonImmutable $oldest): void - { - Type::all()->each(fn (Type $type) => DB::table($type->value) - ->where('date', '<', $oldest->toDateTimeString()) - ->delete()); - } -} diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index f2e9830c..15504576 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -4,8 +4,10 @@ use Carbon\CarbonImmutable; use Laravel\Pulse\Contracts\Ingest; +use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entries\Update; use Laravel\Pulse\Redis as RedisConnection; +use Laravel\Pulse\Storage\Database; class Redis implements Ingest { @@ -17,19 +19,11 @@ class Redis implements Ingest /** * Create a new Redis ingest instance. */ - public function __construct(protected RedisConnection $connection, protected Database $db) + public function __construct(protected RedisConnection $connection) { // } - /** - * Ingest the entries and updates without throwing exceptions. - */ - public function ingestSilently(array $entries, array $updates): void - { - rescue(fn () => $this->ingest($entries, $updates), report: false); - } - /** * Ingest the entries and updates. */ @@ -54,15 +48,7 @@ public function ingest(array $entries, array $updates): void } /** - * Trim the ingest without throwing exceptions. - */ - public function trimSilently(CarbonImmutable $oldest): void - { - rescue(fn () => $this->trim($oldest), report: false); - } - - /** - * Trim the ingest. + * Trim the ingested entries. */ public function trim(CarbonImmutable $oldest): void { @@ -70,9 +56,9 @@ public function trim(CarbonImmutable $oldest): void } /** - * Process the items on the Redis stream and persist in the database. + * Store the ingested entries. */ - public function processEntries(int $count): int + public function store(Storage $storage, int $count): int { $entries = collect($this->connection->xrange($this->stream, '-', '+', $count)); @@ -93,7 +79,7 @@ public function processEntries(int $count): int $updates = $updates ->map(fn ($data): Update => unserialize($data['data'])); - $this->db->ingest($inserts->all(), $updates->all()); + $storage->store($inserts->all(), $updates->all()); $this->connection->xdel($this->stream, $keys->all()); diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php new file mode 100644 index 00000000..afa773ad --- /dev/null +++ b/src/Ingests/Storage.php @@ -0,0 +1,42 @@ +storage->store($entries, $updates); + } + + /** + * Trim the ingested entries. + */ + public function trim(CarbonImmutable $oldest): void + { + $this->storage->trim($oldest); + } + + /** + * Store the ingested entries. + */ + public function store(Storage $store, int $count): int + { + throw new RuntimeException('The storage ingest driver does not need to process entries.'); + } +} diff --git a/src/Pulse.php b/src/Pulse.php index fe4919d0..7399a280 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -136,16 +136,17 @@ public function recordUpdate(Update $update): void */ public function store(): void { - $this->ingest->ingestSilently( + // TODO: logging? report? + rescue(fn () => $this->ingest->ingest( $this->entriesQueue, $this->updatesQueue, - ); - - $this->entriesQueue = $this->updatesQueue = []; + ), report: false); // TODO: lottery configuration? - Lottery::odds(1, 100) - ->winner(fn () => $this->ingest->trimSilently((new CarbonImmutable)->subWeek())) - ->choose(); + rescue(fn () => Lottery::odds(1, 100) + ->winner(fn () => $this->ingest->trim((new CarbonImmutable)->subWeek())) + ->choose(), report: false); + + $this->entriesQueue = $this->updatesQueue = []; } /** diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index eec480eb..5e428062 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -20,7 +20,9 @@ use Laravel\Pulse\Commands\CheckCommand; use Laravel\Pulse\Commands\RestartCommand; use Laravel\Pulse\Commands\WorkCommand; +use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\ShouldNotReportUsage; +use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Handlers\HandleCacheInteraction; use Laravel\Pulse\Handlers\HandleException; use Laravel\Pulse\Handlers\HandleHttpRequest; @@ -40,6 +42,9 @@ use Laravel\Pulse\Http\Livewire\SlowRoutes; use Laravel\Pulse\Http\Livewire\Usage; use Laravel\Pulse\Http\Middleware\Authorize; +use Laravel\Pulse\Ingests\Redis as RedisIngest; +use Laravel\Pulse\Ingests\Storage as StorageIngest; +use Laravel\Pulse\Storage\Database; use Laravel\Pulse\View\Components\Pulse as PulseComponent; use Livewire\Livewire; @@ -54,9 +59,29 @@ public function register(): void return; } - $this->app->singleton(Pulse::class, fn ($app) => new Pulse($app[config('pulse.ingest')])); + $this->app->singleton(Pulse::class); - $this->app->scoped(Redis::class, fn () => new Redis(app('redis')->connection())); + $this->app->bind(Storage::class, function ($app) { + $driver = config('pulse.storage.driver'); + + $config = config("pulse.storage.{$driver}"); + + return new Database($config, $app['db']->connection($config['connection'])); + }); + + $this->app->bind(Ingest::class, function ($app) { + $driver = config('pulse.ingest.driver'); + + if ($driver === 'storage') { + return $app[StorageIngest::class]; + } + + $config = config("pulse.ingest.{$driver}"); + + return new RedisIngest($config, $app['redis']->connection($config['connection'])); + }); + + // $this->app->scoped(RedisIngest::class, fn () => new RedisIngest(app('redis')->connection())); $this->mergeConfigFrom( __DIR__.'/../config/pulse.php', 'pulse' diff --git a/src/Storage/Database.php b/src/Storage/Database.php new file mode 100644 index 00000000..6377f0dc --- /dev/null +++ b/src/Storage/Database.php @@ -0,0 +1,47 @@ +db->transaction(function () use ($entries, $updates) { + collect($entries)->each(fn ($rows, $table) => collect($rows) + ->chunk(1000) + ->map->all() + ->each($this->db->table($table)->insert(...))); + + collect($updates)->each(fn ($update) => $update->perform($db)); + }); + } + + /** + * Trim the ingest. + */ + public function trim(CarbonImmutable $oldest): void + { + Type::all()->each(fn (Type $type) => $this->db->table($type->value) + ->where('date', '<', $oldest->toDateTimeString()) + ->delete()); + } +} From 6957629275b323b48b1833de3ad159b5df37f508 Mon Sep 17 00:00:00 2001 From: timacdonald Date: Fri, 11 Aug 2023 07:03:19 +0000 Subject: [PATCH 02/26] Fix code styling --- src/Commands/RestartCommand.php | 2 -- src/Commands/WorkCommand.php | 1 - src/Entries/Update.php | 2 -- src/Ingests/Redis.php | 1 - src/Ingests/Storage.php | 2 -- src/Storage/Database.php | 1 - 6 files changed, 9 deletions(-) diff --git a/src/Commands/RestartCommand.php b/src/Commands/RestartCommand.php index afa72ebd..23120412 100644 --- a/src/Commands/RestartCommand.php +++ b/src/Commands/RestartCommand.php @@ -5,8 +5,6 @@ use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\InteractsWithTime; -use Laravel\Pulse\Ingests\Storage; -use Laravel\Pulse\Ingests\Redis; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:restart')] diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index 9a28452f..51381b04 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -8,7 +8,6 @@ use Illuminate\Support\Sleep; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage; -use Laravel\Pulse\Ingests\Redis; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:work')] diff --git a/src/Entries/Update.php b/src/Entries/Update.php index 4fecdbff..a35b6d22 100644 --- a/src/Entries/Update.php +++ b/src/Entries/Update.php @@ -3,8 +3,6 @@ namespace Laravel\Pulse\Entries; use Illuminate\Database\Connection; -use Illuminate\Database\Query\Builder; -use Illuminate\Support\Facades\DB; abstract class Update { diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 15504576..1d1de8e2 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -7,7 +7,6 @@ use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entries\Update; use Laravel\Pulse\Redis as RedisConnection; -use Laravel\Pulse\Storage\Database; class Redis implements Ingest { diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php index afa773ad..f3682ddb 100644 --- a/src/Ingests/Storage.php +++ b/src/Ingests/Storage.php @@ -3,10 +3,8 @@ namespace Laravel\Pulse\Ingests; use Carbon\CarbonImmutable; -use Illuminate\Support\Facades\DB; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage as StorageContract; -use Laravel\Pulse\Entries\Type; use RuntimeException; class Storage implements Ingest diff --git a/src/Storage/Database.php b/src/Storage/Database.php index 6377f0dc..e28f23bc 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -4,7 +4,6 @@ use Carbon\CarbonImmutable; use Illuminate\Database\Connection; -use Illuminate\Support\Facades\DB; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entries\Type; From ff4d88ce7aa7435b72f0e4ca20f66fe5afdda5e0 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 16:29:56 +1000 Subject: [PATCH 03/26] wip --- :W | 216 ++++++++++++++++++ composer.json | 9 +- config/pulse.php | 2 +- src/Checks/QueueSize.php | 21 ++ src/Checks/SystemStats.php | 36 +++ src/Commands/CheckCommand.php | 87 ++----- src/Commands/WorkCommand.php | 3 +- src/Contracts/Ingest.php | 12 +- src/Contracts/Storage.php | 12 +- src/Facades/Pulse.php | 15 +- src/Handlers/HandleCacheInteraction.php | 4 +- src/Handlers/HandleException.php | 4 +- src/Handlers/HandleHttpRequest.php | 4 +- ...ddleware.php => HandleOutgoingRequest.php} | 6 +- src/Handlers/HandleProcessedJob.php | 4 +- src/Handlers/HandleProcessingJob.php | 4 +- src/Handlers/HandleQuery.php | 7 +- src/Handlers/HandleQueuedJob.php | 4 +- src/Handlers/HandleSystemStats.php | 38 +++ src/Http/Livewire/Usage.php | 4 +- src/Ingests/Redis.php | 31 ++- src/Ingests/Storage.php | 13 +- src/Pulse.php | 184 ++++++++++----- src/PulseServiceProvider.php | 48 ++-- src/Redis.php | 35 +-- src/Storage/Database.php | 34 +-- 26 files changed, 599 insertions(+), 238 deletions(-) create mode 100644 :W create mode 100644 src/Checks/QueueSize.php create mode 100644 src/Checks/SystemStats.php rename src/Handlers/{HttpRequestMiddleware.php => HandleOutgoingRequest.php} (84%) create mode 100644 src/Handlers/HandleSystemStats.php diff --git a/:W b/:W new file mode 100644 index 00000000..307200e7 --- /dev/null +++ b/:W @@ -0,0 +1,216 @@ +filters = collect([]); + $this->entriesQueue = collect([]); + $this->updatesQueue = collect([]); + } + + /** + * Stop recording entries. + */ + public function shouldNotRecord(): self + { + $this->shouldRecord = false; + + return $this; + } + + /** + * Filter incoming entries using the provided filter. + */ + public function filter(callable $filter) + { + $this->filters[] = $filter; + + return $this; + } + + /** + * Resolve the user's details using the given closure. + */ + public function resolveUsersUsing($callback): self + { + $this->usersResolver = $callback; + + return $this; + } + + /** + * Resolve the user's details using the given closure. + */ + public function resolveUsers(Collection $ids): Collection + { + if ($this->usersResolver) { + return collect(($this->usersResolver)($ids)); + } + + if (class_exists(\App\Models\User::class)) { + return \App\Models\User::findMany($ids); + } + + if (class_exists(\App\User::class)) { + return \App\User::findMany($ids); + } + + return $ids->map(fn ($id) => [ + 'id' => $id, + ]); + } + + /** + * Record the given entry. + */ + public function record(Entry $entry): void + { + $this->entriesQueue[] = $entry; + } + + /** + * Record the given entry update. + */ + public function recordUpdate(Update $update): void + { + $this->updatesQueue[] = $update; + } + + /** + * Store the queued entries and flush the queue. + */ + public function store(): void + { + // TODO: logging? report? We should have a "handlePulseExceptionsUsing" + // and allow user-land control, but by default we just ignore. + rescue(fn () => $this->ingest->ingest( + $this->entriesQueue->filter($this->shouldRecord(...)), + $this->updatesQueue->filter($this->shouldRecord(...)), + ), report: false); + + // TODO: lottery configuration? + rescue(fn () => Lottery::odds(1, 100) + ->winner(fn () => $this->ingest->trim((new CarbonImmutable)->subWeek())) + ->choose(), report: false); + + $this->entriesQueue = collect([]); + $this->updatesQueue = collect([]); + } + + /** + * Return the compiled CSS from the vendor directory. + */ + public function css(): string + { + return file_get_contents(__DIR__.'/../dist/pulse.css'); + } + + /** + * Return the compiled JavaScript from the vendor directory. + */ + public function js(): string + { + return file_get_contents(__DIR__.'/../dist/pulse.js'); + } + + /** + * Determine if the given request can access the Pulse dashboard. + */ + public function check(Request $request): bool + { + return ($this->authUsing ?: function () { + return App::environment('local'); + })($request); + } + + /** + * Set the callback that should be used to authorize Pulse users. + */ + public function auth(callable $callback): self + { + $this->authUsing = $callback; + + return $this; + } + + /** + * Configure Pulse to not register its migrations. + */ + public function ignoreMigrations(): self + { + $this->runsMigrations = false; + + return $this; + } + + /** + * Determine if Pulse may run migrations. + */ + public function runsMigrations(): bool + { + return $this->runsMigrations; + } + + /** + * Determine if the entry should be recorded. + */ + protected function shouldRecord(Entry|Update $entry): bool + { + return $this->shouldRecord && $this->filters->every(fn (callable $filter) => $filter($entry)); + } +} diff --git a/composer.json b/composer.json index 3d10723d..ac9681f9 100644 --- a/composer.json +++ b/composer.json @@ -25,6 +25,7 @@ "nesbot/carbon": "^2.67" }, "require-dev": { + "laravel/facade-documenter": "dev-main", "mockery/mockery": "^1.0", "orchestra/testbench": "^8.0", "pestphp/pest": "^2.0", @@ -64,5 +65,11 @@ } }, "minimum-stability": "dev", - "prefer-stable": true + "prefer-stable": true, + "repositories": { + "facade-documenter": { + "type": "vcs", + "url": "git@github.com:laravel-labs/facade-documenter.git" + } + } } diff --git a/config/pulse.php b/config/pulse.php index cb225672..7ab81e17 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -26,7 +26,7 @@ ], 'ingest' => [ - 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + 'driver' => env('PULSE_INGEST_DRIVER', 'redis'), 'storage' => [], diff --git a/src/Checks/QueueSize.php b/src/Checks/QueueSize.php new file mode 100644 index 00000000..201c8509 --- /dev/null +++ b/src/Checks/QueueSize.php @@ -0,0 +1,21 @@ +map(fn ($queue) => [ + 'queue' => $queue, + 'size' => Queue::size($queue), + 'failed' => collect(app('queue.failer')->all()) + ->filter(fn ($job) => $job->queue === $queue) + ->count(), + ]); + } +} diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php new file mode 100644 index 00000000..210e6a55 --- /dev/null +++ b/src/Checks/SystemStats.php @@ -0,0 +1,36 @@ + Config::get('pulse.server_name'), + ...match (PHP_OS_FAMILY) { + 'Darwin' => [ + 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, + 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB + 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB + ], + 'Linux' => [ + 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, + 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB + 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB + ], + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }, + 'storage' => collect(Config::get('pulse.directories'))->map(fn ($directory) => [ + 'directory' => $directory, + 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB + 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB + ])->toJson(), + ]; + } +} diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 7e49890b..d9cc965e 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -5,9 +5,13 @@ use Carbon\CarbonImmutable; use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; -use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Sleep; +use Laravel\Pulse\Checks\QueueSize; +use Laravel\Pulse\Checks\SystemStats; +use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Facades\Pulse; +use Laravel\Pulse\Handlers\HandleSystemStats; use RuntimeException; use Symfony\Component\Console\Attribute\AsCommand; @@ -36,7 +40,10 @@ class CheckCommand extends Command /** * Handle the command. */ - public function handle(): int + public function handle( + SystemStats $systemStats, + QueueSize $queueSize, + ): int { $lastRestart = Cache::get('illuminate:pulse:restart'); @@ -44,16 +51,12 @@ public function handle(): int while (true) { if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { - $this->comment('Pulse restart requested. Exiting at '.$now->toDateTimeString()); - return self::SUCCESS; } $now = new CarbonImmutable(); if ($now->subSeconds($this->interval)->lessThan($lastSnapshotAt)) { - $this->comment('Sleeping for a second at '.$now->toDateTimeString()); - Sleep::for(1)->second(); continue; @@ -61,23 +64,13 @@ public function handle(): int $lastSnapshotAt = $now->floorSeconds($this->interval); - /* - * Check system stats. - */ + // TODO option to allow only server size stats to be collected. + // Needed for Vapor. - $stats = [ + Pulse::record(new Entry('pulse_servers', [ 'date' => $lastSnapshotAt->toDateTimeString(), - 'server' => config('pulse.server_name'), - ...$this->getStats(), - 'storage' => collect(config('pulse.directories'))->map(fn ($directory) => [ - 'directory' => $directory, - 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB - 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB - ])->toJson(), - ]; - - // TODO: do this via the ingest - DB::table('pulse_servers')->insert($stats); + ...$stats = $systemStats(), + ])); $this->line('[system stats] '.json_encode($stats)); @@ -86,58 +79,16 @@ public function handle(): int */ if (Cache::lock("illuminate:pulse:check-queue-sizes:{$lastSnapshotAt->timestamp}", $this->interval)->get()) { - $sizes = collect(config('pulse.queues')) - ->map(fn ($queue) => [ - 'date' => $lastSnapshotAt->toDateTimeString(), - 'queue' => $queue, - 'size' => Queue::size($queue), - 'failed' => collect(app('queue.failer')->all()) - ->filter(fn ($job) => $job->queue === $queue) - ->count(), - ]); - - DB::table('pulse_queue_sizes')->insert($sizes->all()); + $sizes = $queueSize()->each(fn ($queue) => Pulse::record(new Entry('pulse_queue_sizes', [ + 'date' => $lastSnapshotAt->toDateTimeString(), + ...$queue, + ]))); $this->line('[queue sizes] '.$sizes->toJson()); } - $this->comment('Stats and queue sizes checked at: '.$now->toDateTimeString()); + Pulse::store(); } } - /** - * Collect stats. - */ - protected function getStats(): array - { - return match (PHP_OS_FAMILY) { - 'Darwin' => $this->getDarwinStats(), - 'Linux' => $this->getLinuxStats(), - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }; - } - - /** - * Collect stats for "Darwin" based systems. - */ - protected function getDarwinStats(): array - { - return [ - 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, - 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB - 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB - ]; - } - - /** - * Collect stats for "Linux" based systems. - */ - protected function getLinuxStats(): array - { - return [ - 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, - 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB - 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB - ]; - } } diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index 51381b04..fdc48cfe 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -3,6 +3,7 @@ namespace Laravel\Pulse\Commands; use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Sleep; @@ -48,7 +49,7 @@ public function handle(Ingest $ingest, Storage $storage): int if ($now->subMinute()->greaterThan($lastTrimmedDatabaseAt)) { $this->comment('Trimming the database at '.$now->toDateTimeString()); - $storage->trim($now->subWeek()); + $storage->retain(Interval::week()); $lastTrimmedDatabaseAt = $now; } diff --git a/src/Contracts/Ingest.php b/src/Contracts/Ingest.php index 2a29c020..e6108d9d 100644 --- a/src/Contracts/Ingest.php +++ b/src/Contracts/Ingest.php @@ -2,19 +2,23 @@ namespace Laravel\Pulse\Contracts; -use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; +use Illuminate\Support\Collection; interface Ingest { /** * Ingest the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ - public function ingest(array $entries, array $updates): void; + public function ingest(Collection $entries, Collection $updates): void; /** - * Trim the ingested entries. + * Retain the ingested entries only for the given interval. */ - public function trim(CarbonImmutable $oldest): void; + public function retain(Interval $interval): void; /** * Store the ingested entries. diff --git a/src/Contracts/Storage.php b/src/Contracts/Storage.php index 593d13ac..a1316cc5 100644 --- a/src/Contracts/Storage.php +++ b/src/Contracts/Storage.php @@ -2,17 +2,21 @@ namespace Laravel\Pulse\Contracts; -use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; +use Illuminate\Support\Collection; interface Storage { /** * Store the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ - public function store(array $entries, array $updates): void; + public function store(Collection $entries, Collection $updates): void; /** - * Trim the ingest. + * Retain the stored entries only for the given interval. */ - public function trim(CarbonImmutable $oldest): void; + public function retain(Interval $interval): void; } diff --git a/src/Facades/Pulse.php b/src/Facades/Pulse.php index 7610e31c..f28f0a08 100644 --- a/src/Facades/Pulse.php +++ b/src/Facades/Pulse.php @@ -6,17 +6,20 @@ /** * @method static \Laravel\Pulse\Pulse shouldNotRecord() - * @method static void filter(callable $filter) - * @method static \Laravel\Pulse\Pulse resolveUsersUsing(void $callback) + * @method static \Laravel\Pulse\Pulse filter(callable $filter) + * @method static \Laravel\Pulse\Pulse record(\Laravel\Pulse\Entries\Entry $entry) + * @method static \Laravel\Pulse\Pulse recordUpdate(\Laravel\Pulse\Entries\Update $update) + * @method static \Laravel\Pulse\Pulse store() + * @method static \Laravel\Pulse\Pulse resolveUsersUsing(callable $callback) * @method static \Illuminate\Support\Collection resolveUsers(\Illuminate\Support\Collection $ids) - * @method static void record(\Laravel\Pulse\Entries\Entry $entry) - * @method static void recordUpdate(\Laravel\Pulse\Entries\Update $update) - * @method static void store() * @method static string css() * @method static string js() * @method static bool check(\Illuminate\Http\Request $request) - * @method static \Laravel\Pulse\Pulse auth(\Closure $callback) + * @method static \Laravel\Pulse\Pulse auth(callable $callback) * @method static \Laravel\Pulse\Pulse ignoreMigrations() + * @method static bool runsMigrations() + * @method static \Laravel\Pulse\Pulse handleExceptionsUsing(callable $callback) + * @method static void rescue(callable $callback) * @method static void listenForStorageOpportunities(\Illuminate\Foundation\Application $app) * * @see \Laravel\Pulse\Pulse diff --git a/src/Handlers/HandleCacheInteraction.php b/src/Handlers/HandleCacheInteraction.php index 81ae85fa..4671786c 100644 --- a/src/Handlers/HandleCacheInteraction.php +++ b/src/Handlers/HandleCacheInteraction.php @@ -16,7 +16,7 @@ class HandleCacheInteraction */ public function __invoke(CacheHit|CacheMissed $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); if (str_starts_with($event->key, 'illuminate:')) { @@ -29,6 +29,6 @@ public function __invoke(CacheHit|CacheMissed $event): void 'key' => $event->key, 'user_id' => Auth::id(), ])); - }, report: false); + }); } } diff --git a/src/Handlers/HandleException.php b/src/Handlers/HandleException.php index f359873d..94afade6 100644 --- a/src/Handlers/HandleException.php +++ b/src/Handlers/HandleException.php @@ -16,7 +16,7 @@ class HandleException */ public function __invoke(Throwable $e): void { - rescue(function () use ($e) { + Pulse::rescue(function () use ($e) { $now = new CarbonImmutable(); Pulse::record(new Entry('pulse_exceptions', [ @@ -25,7 +25,7 @@ public function __invoke(Throwable $e): void 'class' => $e::class, 'location' => $this->getLocation($e), ])); - }, report: false); + }); } /** diff --git a/src/Handlers/HandleHttpRequest.php b/src/Handlers/HandleHttpRequest.php index 393b8527..b010fcd2 100644 --- a/src/Handlers/HandleHttpRequest.php +++ b/src/Handlers/HandleHttpRequest.php @@ -17,7 +17,7 @@ class HandleHttpRequest */ public function __invoke(Carbon $startedAt, Request $request, Response $response): void { - rescue(function () use ($startedAt, $request) { + Pulse::rescue(function () use ($startedAt, $request) { $now = new CarbonImmutable(); Pulse::record(new Entry('pulse_requests', [ @@ -26,6 +26,6 @@ public function __invoke(Carbon $startedAt, Request $request, Response $response 'route' => $request->method().' '.Str::start(($request->route()?->uri() ?? $request->path()), '/'), 'duration' => $startedAt->diffInMilliseconds(), ])); - }, report: false); + }); } } diff --git a/src/Handlers/HttpRequestMiddleware.php b/src/Handlers/HandleOutgoingRequest.php similarity index 84% rename from src/Handlers/HttpRequestMiddleware.php rename to src/Handlers/HandleOutgoingRequest.php index b3c08faf..2c17b89b 100644 --- a/src/Handlers/HttpRequestMiddleware.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -10,7 +10,7 @@ use Laravel\Pulse\Facades\Pulse; use Psr\Http\Message\RequestInterface; -class HttpRequestMiddleware +class HandleOutgoingRequest { /** * Invoke the middleware. @@ -21,11 +21,11 @@ public function __invoke(callable $handler) $startedAt = new CarbonImmutable; return $handler($request, $options)->then(function ($response) use ($request, $startedAt) { - rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable), report: false); + Pulse::rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable)); return $response; }, function ($exception) use ($request, $startedAt) { - rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable), report: false); + Pulse::rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable)); return new RejectedPromise($exception); }); diff --git a/src/Handlers/HandleProcessedJob.php b/src/Handlers/HandleProcessedJob.php index fc976a03..9303d72b 100644 --- a/src/Handlers/HandleProcessedJob.php +++ b/src/Handlers/HandleProcessedJob.php @@ -15,7 +15,7 @@ class HandleProcessedJob */ public function __invoke(JobProcessed|JobFailed $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); // TODO respect slow limit configuration? I don't think we should @@ -26,6 +26,6 @@ public function __invoke(JobProcessed|JobFailed $event): void (string) $event->job->getJobId(), $now->toDateTimeString('millisecond') )); - }, report: false); + }); } } diff --git a/src/Handlers/HandleProcessingJob.php b/src/Handlers/HandleProcessingJob.php index 4febfa44..869455b2 100644 --- a/src/Handlers/HandleProcessingJob.php +++ b/src/Handlers/HandleProcessingJob.php @@ -14,13 +14,13 @@ class HandleProcessingJob */ public function __invoke(JobProcessing $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); Pulse::recordUpdate(new JobStarted( (string) $event->job->getJobId(), $now->toDateTimeString('millisecond') )); - }, report: false); + }); } } diff --git a/src/Handlers/HandleQuery.php b/src/Handlers/HandleQuery.php index 06421f75..8181d15a 100644 --- a/src/Handlers/HandleQuery.php +++ b/src/Handlers/HandleQuery.php @@ -5,6 +5,7 @@ use Carbon\CarbonImmutable; use Illuminate\Database\Events\QueryExecuted; use Illuminate\Support\Facades\Auth; +use Illuminate\Support\Facades\Config; use Laravel\Pulse\Entries\Entry; use Laravel\Pulse\Facades\Pulse; @@ -15,10 +16,10 @@ class HandleQuery */ public function __invoke(QueryExecuted $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); - if ($event->time < config('pulse.slow_query_threshold')) { + if ($event->time < Config::get('pulse.slow_query_threshold')) { return; } @@ -28,6 +29,6 @@ public function __invoke(QueryExecuted $event): void 'sql' => $event->sql, 'duration' => round($event->time), ])); - }, report: false); + }); } } diff --git a/src/Handlers/HandleQueuedJob.php b/src/Handlers/HandleQueuedJob.php index 6b2b024f..3d4acd68 100644 --- a/src/Handlers/HandleQueuedJob.php +++ b/src/Handlers/HandleQueuedJob.php @@ -15,7 +15,7 @@ class HandleQueuedJob */ public function __invoke(JobQueued $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); Pulse::record(new Entry('pulse_jobs', [ @@ -26,6 +26,6 @@ public function __invoke(JobQueued $event): void : $event->job::class, 'job_id' => $event->id, ])); - }, report: false); + }); } } diff --git a/src/Handlers/HandleSystemStats.php b/src/Handlers/HandleSystemStats.php new file mode 100644 index 00000000..43154a21 --- /dev/null +++ b/src/Handlers/HandleSystemStats.php @@ -0,0 +1,38 @@ + $now->toDateTimeString(), + 'server' => config('pulse.server_name'), + ...match (PHP_OS_FAMILY) { + 'Darwin' => [ + 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, + 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB + 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB + ], + 'Linux' => [ + 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, + 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB + 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB + ], + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }, + 'storage' => collect(config('pulse.directories'))->map(fn ($directory) => [ + 'directory' => $directory, + 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB + 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB + ])->toJson(), + ])); + + return $stats; + } +} diff --git a/src/Http/Livewire/Usage.php b/src/Http/Livewire/Usage.php index 2cbf3691..33dc744c 100644 --- a/src/Http/Livewire/Usage.php +++ b/src/Http/Livewire/Usage.php @@ -99,10 +99,10 @@ protected function userRequestCounts(): array return $user ? [ 'count' => $row->count, 'user' => [ - 'name' => $user['name'] ?? 'User: '.$user['id'], + 'name' => $user['name'], // "extra" rather than 'email' // avatar for pretty-ness? - 'email' => $user['email'] ?? null, + 'email' => $user['email'], ], ] : null; }) diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 1d1de8e2..32eeb43f 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -2,9 +2,11 @@ namespace Laravel\Pulse\Ingests; -use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; +use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage; +use Laravel\Pulse\Entries\Entry; use Laravel\Pulse\Entries\Update; use Laravel\Pulse\Redis as RedisConnection; @@ -16,9 +18,9 @@ class Redis implements Ingest protected string $stream = 'illuminate:pulse:entries'; /** - * Create a new Redis ingest instance. + * Create a new Redis Ingest instance. */ - public function __construct(protected RedisConnection $connection) + public function __construct(protected array $config, protected RedisConnection $connection) { // } @@ -26,20 +28,20 @@ public function __construct(protected RedisConnection $connection) /** * Ingest the entries and updates. */ - public function ingest(array $entries, array $updates): void + public function ingest(Collection $entries, Collection $updates): void { - if ($entries === [] && $updates === []) { + if ($entries->isEmpty() && $updates->isEmpty()) { return; } $this->connection->pipeline(function (RedisConnection $pipeline) use ($entries, $updates) { - collect($entries)->each(fn ($rows, $table) => collect($rows) - ->each(fn ($data) => $pipeline->xadd($this->stream, [ + $entries->groupBy('table') + ->each(fn ($rows, $table) => $rows->each(fn ($data) => $pipeline->xadd($this->stream, [ 'type' => $table, 'data' => json_encode($data), ]))); - collect($updates)->each(fn ($update) => $pipeline->xadd($this->stream, [ + $updates->each(fn ($update) => $pipeline->xadd($this->stream, [ 'type' => 'pulse_update', 'data' => serialize($update), ])); @@ -49,9 +51,9 @@ public function ingest(array $entries, array $updates): void /** * Trim the ingested entries. */ - public function trim(CarbonImmutable $oldest): void + public function retain(Interval $interval): void { - $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($oldest)); + $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($interval->copy()->invert())); } /** @@ -71,14 +73,11 @@ public function store(Storage $storage, int $count): int ->values() ->partition(fn ($entry) => $entry['type'] !== 'pulse_update'); - $inserts = $inserts - ->groupBy('type') - ->map->map(fn ($data): array => json_decode($data['data'], true)); + $inserts = $inserts->map(fn ($data): Entry => new Entry($data['type'], json_decode($data['data'], true))); - $updates = $updates - ->map(fn ($data): Update => unserialize($data['data'])); + $updates = $updates->map(fn ($data): Update => unserialize($data['data'])); - $storage->store($inserts->all(), $updates->all()); + $storage->store($inserts, $updates); $this->connection->xdel($this->stream, $keys->all()); diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php index f3682ddb..3e79f9d0 100644 --- a/src/Ingests/Storage.php +++ b/src/Ingests/Storage.php @@ -2,7 +2,8 @@ namespace Laravel\Pulse\Ingests; -use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; +use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage as StorageContract; use RuntimeException; @@ -17,23 +18,23 @@ public function __construct(protected StorageContract $storage) /** * Ingest the entries and updates. */ - public function ingest(array $entries, array $updates): void + public function ingest(Collection $entries, Collection $updates): void { $this->storage->store($entries, $updates); } /** - * Trim the ingested entries. + * Retain the ingested entries only for the given interval. */ - public function trim(CarbonImmutable $oldest): void + public function retain(Interval $interval): void { - $this->storage->trim($oldest); + $this->storage->retain($interval); } /** * Store the ingested entries. */ - public function store(Storage $store, int $count): int + public function store(StorageContract $store, int $count): int { throw new RuntimeException('The storage ingest driver does not need to process entries.'); } diff --git a/src/Pulse.php b/src/Pulse.php index 7399a280..ae29be2e 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -11,45 +11,63 @@ use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Entries\Entry; use Laravel\Pulse\Entries\Update; +use Throwable; class Pulse { use ListensForStorageOpportunities; /** - * The callback that should be used to authenticate Pulse users. + * The list of queued entries to be stored. + * + * @var \Illuminate\Support\Collection */ - public ?Closure $authUsing = null; + protected Collection $entriesQueue; /** - * Indicates if Pulse migrations will be run. + * The list of queued entry updates. + * + * @var \Illuminate\Support\Collection */ - public bool $runsMigrations = true; + protected Collection $updatesQueue; /** - * The list of queued entries to be stored. + * Indicates if Pulse should record entries. */ - public array $entriesQueue = []; + protected bool $shouldRecord = true; /** - * The list of queued entry updates. + * The entry filters. + * + * @var \Illuminate\Support\Collection */ - public array $updatesQueue = []; + protected Collection $filters; /** - * Indicates if Pulse should record entries. + * Users resolver. + * + * @var (callable(\Illuminate\Support\Collection): iterable)|null */ - public bool $shouldRecord = true; + protected $usersResolver; /** - * Users resolver. + * The callback that should be used to authenticate Pulse users. + * + * @var (callable(\Illuminate\Http\Request): bool)|null */ - public ?Closure $usersResolver = null; + protected $authUsing = null; /** - * The entry filters. + * Indicates if Pulse migrations will be run. + */ + protected bool $runsMigrations = true; + + /** + * Handle exceptions using the given callback. + * + * @var (callable(\Throwable): mixed)|null */ - public Collection $filters; + protected $handleExceptionsUsing = null; /** * Create a new Pulse instance. @@ -57,6 +75,8 @@ class Pulse public function __construct(protected Ingest $ingest) { $this->filters = collect([]); + + $this->clearQueue(); } /** @@ -71,8 +91,10 @@ public function shouldNotRecord(): self /** * Filter incoming entries using the provided filter. + * + * @param (callable(\Laravel\Pulse\Entries\Entry|\Laravel\Pulse\Entries\Update): bool) $filter */ - public function filter(callable $filter) + public function filter(callable $filter): self { $this->filters[] = $filter; @@ -80,73 +102,89 @@ public function filter(callable $filter) } /** - * Resolve the user's details using the given closure. + * Record the given entry. */ - public function resolveUsersUsing($callback): self + public function record(Entry $entry): self { - $this->usersResolver = $callback; + if ($this->shouldRecord) { + $this->entriesQueue[] = $entry; + } return $this; } /** - * Resolve the user's details using the given closure. + * Record the given entry update. */ - public function resolveUsers(Collection $ids): Collection + public function recordUpdate(Update $update): self { - if ($this->usersResolver) { - return collect(($this->usersResolver)($ids)); + if ($this->shouldRecord) { + $this->updatesQueue[] = $update; } - if (class_exists(\App\Models\User::class)) { - return \App\Models\User::findMany($ids); - } + return $this; + } - if (class_exists(\App\User::class)) { - return \App\User::findMany($ids); + /** + * Store the queued entries and flush the queue. + */ + public function store(): self + { + if (! $this->shouldRecord) { + return $this->clearQueue(); } - return $ids->map(fn ($id) => [ - 'id' => $id, - ]); + // TODO: logging? report? We should have a "handlePulseExceptionsUsing" + // and allow user-land control, but by default we just ignore. + $this->rescue(fn () => $this->ingest->ingest( + $this->entriesQueue->filter($this->shouldRecord(...)), + $this->updatesQueue->filter($this->shouldRecord(...)), + )); + + // TODO: lottery configuration? + $this->rescue(fn () => Lottery::odds(1, 100) + ->winner(fn () => $this->ingest->retain((new CarbonImmutable)->subWeek())) + ->choose()); + + return $this->clearQueue(); } /** - * Record the given entry. + * Determine if the entry should be recorded. */ - public function record(Entry $entry): void + protected function shouldRecord(Entry|Update $entry): bool { - if ($this->shouldRecord($entry)) { - $this->entriesQueue[$entry->table][] = $entry->attributes; - } + return $this->filters->every(fn (callable $filter) => $filter($entry)); } /** - * Record the given entry update. + * Resolve the user's details using the given closure. */ - public function recordUpdate(Update $update): void + public function resolveUsersUsing(callable $callback): self { - if ($this->shouldRecord($update)) { - $this->updatesQueue[] = $update; - } + $this->usersResolver = $callback; + + return $this; } /** - * Store the queued entries and flush the queue. + * Resolve the user's details using the given closure. */ - public function store(): void + public function resolveUsers(Collection $ids): Collection { - // TODO: logging? report? - rescue(fn () => $this->ingest->ingest( - $this->entriesQueue, $this->updatesQueue, - ), report: false); + if ($this->usersResolver) { + return collect(($this->usersResolver)($ids)); + } - // TODO: lottery configuration? - rescue(fn () => Lottery::odds(1, 100) - ->winner(fn () => $this->ingest->trim((new CarbonImmutable)->subWeek())) - ->choose(), report: false); + if (class_exists(\App\Models\User::class)) { + return \App\Models\User::whereKey($ids)->get(['name', 'email']); + } + + if (class_exists(\App\User::class)) { + return \App\User::whereKey($ids)->get(['name', 'email']); + } - $this->entriesQueue = $this->updatesQueue = []; + return $ids->map(fn ($id) => ['name' => "User ID: {$id}"]); } /** @@ -170,15 +208,15 @@ public function js(): string */ public function check(Request $request): bool { - return ($this->authUsing ?: function () { - return App::environment('local'); - })($request); + return ($this->authUsing ?: fn () => App::environment('local'))($request); } /** * Set the callback that should be used to authorize Pulse users. + * + * @param (callable(\Illuminate\Http\Request): bool) $callback */ - public function auth(Closure $callback): self + public function auth(callable $callback): self { $this->authUsing = $callback; @@ -204,10 +242,40 @@ public function runsMigrations(): bool } /** - * Determine if the entry should be recorded. + * Handle exceptions using the given callback. + * + * @param (callable(\Throwable): mixed) $callback */ - protected function shouldRecord(Entry|Update $entry): bool + public function handleExceptionsUsing(callable $callback): self + { + $this->handleExceptionsUsing = $callback; + + return $this; + } + + /** + * Execute the given callback handling any exceptions. + * + * @param (callable(): mixed) $callback + */ + public function rescue(callable $callback): void + { + try { + $callback(); + } catch (Throwable $e) { + ($this->handleExceptionsUsing)($e); + } + } + + /** + * Clear any pending entries on the queue. + */ + protected function clearQueue(): self { - return $this->shouldRecord && $this->filters->every(fn (callable $filter) => $filter($entry)); + $this->entriesQueue = collect([]); + + $this->updatesQueue = collect([]); + + return $this; } } diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index 5e428062..d2c9d395 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -13,6 +13,7 @@ use Illuminate\Queue\Events\JobProcessing; use Illuminate\Queue\Events\JobQueued; use Illuminate\Support\Facades\Blade; +use Illuminate\Support\Facades\Config; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Http; use Illuminate\Support\Facades\Route; @@ -26,11 +27,11 @@ use Laravel\Pulse\Handlers\HandleCacheInteraction; use Laravel\Pulse\Handlers\HandleException; use Laravel\Pulse\Handlers\HandleHttpRequest; +use Laravel\Pulse\Handlers\HandleOutgoingRequest; use Laravel\Pulse\Handlers\HandleProcessedJob; use Laravel\Pulse\Handlers\HandleProcessingJob; use Laravel\Pulse\Handlers\HandleQuery; use Laravel\Pulse\Handlers\HandleQueuedJob; -use Laravel\Pulse\Handlers\HttpRequestMiddleware; use Laravel\Pulse\Http\Livewire\Cache; use Laravel\Pulse\Http\Livewire\Exceptions; use Laravel\Pulse\Http\Livewire\PeriodSelector; @@ -61,27 +62,31 @@ public function register(): void $this->app->singleton(Pulse::class); - $this->app->bind(Storage::class, function ($app) { - $driver = config('pulse.storage.driver'); + $this->app->singleton(Storage::class, function ($app) { + $driver = Config::get('pulse.storage.driver'); - $config = config("pulse.storage.{$driver}"); + $config = Config::get("pulse.storage.{$driver}"); - return new Database($config, $app['db']->connection($config['connection'])); + return new Database($config, $app['db']); }); - $this->app->bind(Ingest::class, function ($app) { - $driver = config('pulse.ingest.driver'); + $this->app->singleton(Ingest::class, function ($app) { + $driver = Config::get('pulse.ingest.driver'); if ($driver === 'storage') { return $app[StorageIngest::class]; } - $config = config("pulse.ingest.{$driver}"); + $ingestConfig = Config::get("pulse.ingest.{$driver}"); - return new RedisIngest($config, $app['redis']->connection($config['connection'])); - }); + $redisConfig = [ + ...Config::get("database.redis.options"), + ...Config::get("database.redis.{$ingestConfig['connection']}"), + ...$ingestConfig, + ]; - // $this->app->scoped(RedisIngest::class, fn () => new RedisIngest(app('redis')->connection())); + return new RedisIngest($ingestConfig, new Redis($redisConfig, $app['redis'])); + }); $this->mergeConfigFrom( __DIR__.'/../config/pulse.php', 'pulse' @@ -140,7 +145,7 @@ protected function listenForEvents(): void ], HandleProcessedJob::class); if (method_exists(Factory::class, 'globalMiddleware')) { - Http::globalMiddleware(new HttpRequestMiddleware); + Http::globalMiddleware(new HandleOutgoingRequest); } // TODO: Telescope passes the container like this, but I'm unsure how it works with Octane. @@ -154,17 +159,14 @@ protected function listenForEvents(): void protected function registerRoutes(): void { Route::group([ - 'domain' => config('pulse.domain', null), - 'middleware' => config('pulse.middleware', 'web'), - 'namespace' => 'Laravel\Pulse\Http\Controllers', - 'prefix' => config('pulse.path'), - ], function () { - Route::get('/', function (Pulse $pulse) { - $pulse->shouldRecord = false; - - return view('pulse::dashboard'); - }); - }); + 'domain' => Config::get('pulse.domain', null), + 'middleware' => Config::get('pulse.middleware', 'web'), + 'prefix' => Config::get('pulse.path'), + ], fn () => Route::get('/', function (Pulse $pulse) { + $pulse->shouldNotRecord(); + + return view('pulse::dashboard'); + })); } /** diff --git a/src/Redis.php b/src/Redis.php index 122ca068..a3efa270 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -2,8 +2,9 @@ namespace Laravel\Pulse; -use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; use Illuminate\Redis\Connections\Connection; +use Illuminate\Redis\RedisManager; use Predis\Client as Predis; use Predis\Pipeline\Pipeline; use Redis as PhpRedis; @@ -12,6 +13,8 @@ /** * @mixin \Redis * @mixin \Predis\Client + * + * @internal */ class Redis { @@ -20,10 +23,10 @@ class Redis * * @param \Redis|\Predis\Client|\Predis\Pipeline\Pipeline|null $client */ - public function __construct(protected ?Connection $connection = null, protected $client = null) + public function __construct(protected array $config, protected ?RedisManager $manager = null, protected $client = null) { - if (array_filter(func_get_args()) === []) { - throw new RuntimeException('Must provider a connection or client.'); + if ($manager === null && $client === null) { + throw new RuntimeException('Must provider a manager or client.'); } } @@ -52,11 +55,9 @@ public function xrange($key, $start, $end, $count = null) */ public function xtrim($key, $strategy, $strategyModifier, $threshold) { - $prefix = config('database.redis.options.prefix'); - if ($this->isPhpRedis()) { // PHP Redis does not support the minid strategy. - return $this->client()->rawCommand('XTRIM', $prefix.$key, $strategy, $strategyModifier, $threshold); + return $this->client()->rawCommand('XTRIM', $this->config['prefix'].$key, $strategy, $strategyModifier, $threshold); } return $this->client()->xtrim($key, [$strategy, $strategyModifier], $threshold); @@ -69,24 +70,21 @@ public function pipeline(callable $closure): array { // TODO explain this code - lol // ensure we run against a connection... - return $this->connection->pipeline(function ($redis) use ($closure) { - $closure(new self(client: $redis)); + return $this->connection()->pipeline(function ($redis) use ($closure) { + $closure(new self($this->config, client: $redis)); }); } /** * Get the ID of the stream at a given time. */ - public function streamIdAt(CarbonImmutable $timestamp): string + public function streamIdAt(Interval $interval): string { - // TODO: pass through intervals rather than date instances everywhere - $diff = (new CarbonImmutable)->diffInMilliseconds($timestamp); - $redisTime = $this->client()->time(); $redisTimestamp = $redisTime[0].substr($redisTime[1], 0, 3); - return (string) ($redisTimestamp - $diff); + return (string) ($redisTimestamp + $interval->totalMilliseconds); } /** @@ -110,7 +108,12 @@ protected function isPredis(): bool */ protected function client(): PhpRedis|Predis|Pipeline { - return $this->connection?->client() ?? $this->client; + return $this->connection()?->client() ?? $this->client; + } + + protected function connection(): ?Connection + { + return $this->manager?->connection($this->config['connection']); } /** @@ -118,6 +121,6 @@ protected function client(): PhpRedis|Predis|Pipeline */ public function __call(string $method, array $parameters): mixed { - return ($this->connection ?? $this->client)->{$method}(...$parameters); + return ($this->connection() ?? $this->client)->{$method}(...$parameters); } } diff --git a/src/Storage/Database.php b/src/Storage/Database.php index e28f23bc..c85fbc33 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -3,14 +3,16 @@ namespace Laravel\Pulse\Storage; use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; use Illuminate\Database\Connection; -use Laravel\Pulse\Contracts\Ingest; +use Illuminate\Database\DatabaseManager; +use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entries\Type; class Database implements Storage { - public function __construct(protected array $config, protected Connection $db) + public function __construct(protected array $config, protected DatabaseManager $manager) { // } @@ -18,29 +20,33 @@ public function __construct(protected array $config, protected Connection $db) /** * Store the entries and updates. */ - public function store(array $entries, array $updates): void + public function store(Collection $entries, Collection $updates): void { - if ($entries === [] && $updates === []) { + if ($entries->isEmpty() && $updates->isEmpty()) { return; } - $this->db->transaction(function () use ($entries, $updates) { - collect($entries)->each(fn ($rows, $table) => collect($rows) - ->chunk(1000) - ->map->all() - ->each($this->db->table($table)->insert(...))); + $this->connection()->transaction(function () use ($entries, $updates) { + $entries->groupBy('table')->each(fn ($rows, $table) => $rows->chunk(1000) + ->map(fn ($inserts) => $inserts->pluck('attributes')->all()) + ->each($this->connection()->table($table)->insert(...))); - collect($updates)->each(fn ($update) => $update->perform($db)); + $updates->each(fn ($update) => $update->perform($db)); }); } /** - * Trim the ingest. + * Retain the ingested entries only for the given interval. */ - public function trim(CarbonImmutable $oldest): void + public function retain(Interval $interval): void { - Type::all()->each(fn (Type $type) => $this->db->table($type->value) - ->where('date', '<', $oldest->toDateTimeString()) + Type::all()->each(fn (Type $type) => $this->connection()->table($type->value) + ->where('date', '<', (new CarbonImmutable)->subSeconds($interval->seconds)->toDateTimeString()) ->delete()); } + + protected function connection(): Connection + { + return $this->manager->connection($this->config['connection']); + } } From 8128860f649d0577e5bd9f0fe3a17e21b253d6b4 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 16:52:25 +1000 Subject: [PATCH 04/26] wip --- src/Commands/CheckCommand.php | 14 +++++++------- src/Commands/WorkCommand.php | 14 +++++--------- src/Http/Livewire/Servers.php | 2 +- src/Ingests/Redis.php | 6 ++++-- 4 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index d9cc965e..8a0cd42d 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -50,12 +50,12 @@ public function handle( $lastSnapshotAt = (new CarbonImmutable)->floorSeconds($this->interval); while (true) { + $now = new CarbonImmutable(); + if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { return self::SUCCESS; } - $now = new CarbonImmutable(); - if ($now->subSeconds($this->interval)->lessThan($lastSnapshotAt)) { Sleep::for(1)->second(); @@ -64,18 +64,19 @@ public function handle( $lastSnapshotAt = $now->floorSeconds($this->interval); - // TODO option to allow only server size stats to be collected. - // Needed for Vapor. + /* + * Collect server stats. + */ Pulse::record(new Entry('pulse_servers', [ 'date' => $lastSnapshotAt->toDateTimeString(), ...$stats = $systemStats(), ])); - $this->line('[system stats] '.json_encode($stats)); + $this->line('[system stats] '.json_encode($stats, flags: JSON_THROW_ON_ERROR)); /* - * Check queue sizes. + * Collect queue sizes. */ if (Cache::lock("illuminate:pulse:check-queue-sizes:{$lastSnapshotAt->timestamp}", $this->interval)->get()) { @@ -90,5 +91,4 @@ public function handle( Pulse::store(); } } - } diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index fdc48cfe..dd585505 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -35,30 +35,26 @@ public function handle(Ingest $ingest, Storage $storage): int { $lastRestart = Cache::get('illuminate:pulse:restart'); - $lastTrimmedDatabaseAt = (new CarbonImmutable)->startOfMinute(); + $lastTrimmedStorageAt = (new CarbonImmutable)->startOfMinute(); while (true) { $now = new CarbonImmutable; if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { - $this->comment('Pulse restart requested. Exiting at '.$now->toDateTimeString()); - return self::SUCCESS; } - if ($now->subMinute()->greaterThan($lastTrimmedDatabaseAt)) { - $this->comment('Trimming the database at '.$now->toDateTimeString()); - + if ($now->subMinute()->greaterThan($lastTrimmedStorageAt)) { $storage->retain(Interval::week()); - $lastTrimmedDatabaseAt = $now; + $lastTrimmedStorageAt = $now; + + $this->comment('Storage trimmed'); } $processed = $ingest->store($storage, 1000); if ($processed === 0) { - $this->comment('Queue finished processing. Sleeping at '.$now->toDateTimeString()); - Sleep::for(1)->second(); } else { $this->comment('Processed ['.$processed.'] entries at '.$now->toDateTimeString()); diff --git a/src/Http/Livewire/Servers.php b/src/Http/Livewire/Servers.php index 062e1b93..f48e444a 100644 --- a/src/Http/Livewire/Servers.php +++ b/src/Http/Livewire/Servers.php @@ -116,7 +116,7 @@ protected function servers(): Collection 'cpu_percent' => $server->cpu_percent, 'memory_used' => $server->memory_used, 'memory_total' => $server->memory_total, - 'storage' => json_decode($server->storage), + 'storage' => json_decode($server->storage, flags: JSON_THROW_ON_ERROR), 'readings' => $serverReadings->get($server->server)?->map(fn ($reading) => (object) [ 'cpu_percent' => $reading->cpu_percent, 'memory_used' => $reading->memory_used, diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 32eeb43f..8ab7a689 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -38,7 +38,7 @@ public function ingest(Collection $entries, Collection $updates): void $entries->groupBy('table') ->each(fn ($rows, $table) => $rows->each(fn ($data) => $pipeline->xadd($this->stream, [ 'type' => $table, - 'data' => json_encode($data), + 'data' => json_encode($data, flags: JSON_THROW_ON_ERROR), ]))); $updates->each(fn ($update) => $pipeline->xadd($this->stream, [ @@ -73,7 +73,9 @@ public function store(Storage $storage, int $count): int ->values() ->partition(fn ($entry) => $entry['type'] !== 'pulse_update'); - $inserts = $inserts->map(fn ($data): Entry => new Entry($data['type'], json_decode($data['data'], true))); + $inserts = $inserts->map(fn ($data) => with(json_decode($data['data'], true, flags: JSON_THROW_ON_ERROR), function ($data) { + return new Entry($data['table'], $data['attributes']); + })); $updates = $updates->map(fn ($data): Update => unserialize($data['data'])); From c9f1f7eb7ab525519997a878c2158410abe6dbb3 Mon Sep 17 00:00:00 2001 From: timacdonald Date: Mon, 14 Aug 2023 06:52:58 +0000 Subject: [PATCH 05/26] Fix code styling --- src/Checks/SystemStats.php | 2 -- src/Commands/CheckCommand.php | 5 +---- src/PulseServiceProvider.php | 2 +- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php index 210e6a55..22cedd8b 100644 --- a/src/Checks/SystemStats.php +++ b/src/Checks/SystemStats.php @@ -2,9 +2,7 @@ namespace Laravel\Pulse\Checks; -use Carbon\CarbonImmutable; use Illuminate\Support\Facades\Config; -use Laravel\Pulse\Facades\Pulse; use RuntimeException; class SystemStats diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 8a0cd42d..179c10d2 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -11,8 +11,6 @@ use Laravel\Pulse\Checks\SystemStats; use Laravel\Pulse\Entries\Entry; use Laravel\Pulse\Facades\Pulse; -use Laravel\Pulse\Handlers\HandleSystemStats; -use RuntimeException; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:check')] @@ -43,8 +41,7 @@ class CheckCommand extends Command public function handle( SystemStats $systemStats, QueueSize $queueSize, - ): int - { + ): int { $lastRestart = Cache::get('illuminate:pulse:restart'); $lastSnapshotAt = (new CarbonImmutable)->floorSeconds($this->interval); diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index d2c9d395..055f0fad 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -80,7 +80,7 @@ public function register(): void $ingestConfig = Config::get("pulse.ingest.{$driver}"); $redisConfig = [ - ...Config::get("database.redis.options"), + ...Config::get('database.redis.options'), ...Config::get("database.redis.{$ingestConfig['connection']}"), ...$ingestConfig, ]; From 81513ffbc92b7aebb8354356372caf8f1170bfef Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 16:52:58 +1000 Subject: [PATCH 06/26] wip --- :W | 216 ------------------------------------------------------------- 1 file changed, 216 deletions(-) delete mode 100644 :W diff --git a/:W b/:W deleted file mode 100644 index 307200e7..00000000 --- a/:W +++ /dev/null @@ -1,216 +0,0 @@ -filters = collect([]); - $this->entriesQueue = collect([]); - $this->updatesQueue = collect([]); - } - - /** - * Stop recording entries. - */ - public function shouldNotRecord(): self - { - $this->shouldRecord = false; - - return $this; - } - - /** - * Filter incoming entries using the provided filter. - */ - public function filter(callable $filter) - { - $this->filters[] = $filter; - - return $this; - } - - /** - * Resolve the user's details using the given closure. - */ - public function resolveUsersUsing($callback): self - { - $this->usersResolver = $callback; - - return $this; - } - - /** - * Resolve the user's details using the given closure. - */ - public function resolveUsers(Collection $ids): Collection - { - if ($this->usersResolver) { - return collect(($this->usersResolver)($ids)); - } - - if (class_exists(\App\Models\User::class)) { - return \App\Models\User::findMany($ids); - } - - if (class_exists(\App\User::class)) { - return \App\User::findMany($ids); - } - - return $ids->map(fn ($id) => [ - 'id' => $id, - ]); - } - - /** - * Record the given entry. - */ - public function record(Entry $entry): void - { - $this->entriesQueue[] = $entry; - } - - /** - * Record the given entry update. - */ - public function recordUpdate(Update $update): void - { - $this->updatesQueue[] = $update; - } - - /** - * Store the queued entries and flush the queue. - */ - public function store(): void - { - // TODO: logging? report? We should have a "handlePulseExceptionsUsing" - // and allow user-land control, but by default we just ignore. - rescue(fn () => $this->ingest->ingest( - $this->entriesQueue->filter($this->shouldRecord(...)), - $this->updatesQueue->filter($this->shouldRecord(...)), - ), report: false); - - // TODO: lottery configuration? - rescue(fn () => Lottery::odds(1, 100) - ->winner(fn () => $this->ingest->trim((new CarbonImmutable)->subWeek())) - ->choose(), report: false); - - $this->entriesQueue = collect([]); - $this->updatesQueue = collect([]); - } - - /** - * Return the compiled CSS from the vendor directory. - */ - public function css(): string - { - return file_get_contents(__DIR__.'/../dist/pulse.css'); - } - - /** - * Return the compiled JavaScript from the vendor directory. - */ - public function js(): string - { - return file_get_contents(__DIR__.'/../dist/pulse.js'); - } - - /** - * Determine if the given request can access the Pulse dashboard. - */ - public function check(Request $request): bool - { - return ($this->authUsing ?: function () { - return App::environment('local'); - })($request); - } - - /** - * Set the callback that should be used to authorize Pulse users. - */ - public function auth(callable $callback): self - { - $this->authUsing = $callback; - - return $this; - } - - /** - * Configure Pulse to not register its migrations. - */ - public function ignoreMigrations(): self - { - $this->runsMigrations = false; - - return $this; - } - - /** - * Determine if Pulse may run migrations. - */ - public function runsMigrations(): bool - { - return $this->runsMigrations; - } - - /** - * Determine if the entry should be recorded. - */ - protected function shouldRecord(Entry|Update $entry): bool - { - return $this->shouldRecord && $this->filters->every(fn (callable $filter) => $filter($entry)); - } -} From c8ac3b5c526df5501c83e1a634e30c9df2f5b965 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 16:54:39 +1000 Subject: [PATCH 07/26] wip --- composer.json | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/composer.json b/composer.json index ac9681f9..3d10723d 100644 --- a/composer.json +++ b/composer.json @@ -25,7 +25,6 @@ "nesbot/carbon": "^2.67" }, "require-dev": { - "laravel/facade-documenter": "dev-main", "mockery/mockery": "^1.0", "orchestra/testbench": "^8.0", "pestphp/pest": "^2.0", @@ -65,11 +64,5 @@ } }, "minimum-stability": "dev", - "prefer-stable": true, - "repositories": { - "facade-documenter": { - "type": "vcs", - "url": "git@github.com:laravel-labs/facade-documenter.git" - } - } + "prefer-stable": true } From 859c2ef23b442cdc5f134dc35c665a3573e0a62b Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 16:56:29 +1000 Subject: [PATCH 08/26] wip --- src/Handlers/HandleSystemStats.php | 38 ------------------------------ 1 file changed, 38 deletions(-) delete mode 100644 src/Handlers/HandleSystemStats.php diff --git a/src/Handlers/HandleSystemStats.php b/src/Handlers/HandleSystemStats.php deleted file mode 100644 index 43154a21..00000000 --- a/src/Handlers/HandleSystemStats.php +++ /dev/null @@ -1,38 +0,0 @@ - $now->toDateTimeString(), - 'server' => config('pulse.server_name'), - ...match (PHP_OS_FAMILY) { - 'Darwin' => [ - 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, - 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB - 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB - ], - 'Linux' => [ - 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, - 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB - 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB - ], - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }, - 'storage' => collect(config('pulse.directories'))->map(fn ($directory) => [ - 'directory' => $directory, - 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB - 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB - ])->toJson(), - ])); - - return $stats; - } -} From 2615af531cd8ca2323f801452e5bceb585ca329e Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Mon, 14 Aug 2023 17:07:34 +1000 Subject: [PATCH 09/26] wip --- src/Checks/QueueSize.php | 3 +++ src/Checks/SystemStats.php | 3 +++ src/Entries/Entry.php | 2 ++ src/Entries/Type.php | 2 ++ src/Handlers/HandleOutgoingRequest.php | 3 +++ src/Http/Livewire/Usage.php | 2 +- src/Ingests/Redis.php | 5 +++++ src/Ingests/Storage.php | 6 ++++++ src/Pulse.php | 2 +- src/Storage/Database.php | 13 ++++++++++++- 10 files changed, 38 insertions(+), 3 deletions(-) diff --git a/src/Checks/QueueSize.php b/src/Checks/QueueSize.php index 201c8509..96ed4fa7 100644 --- a/src/Checks/QueueSize.php +++ b/src/Checks/QueueSize.php @@ -8,6 +8,9 @@ class QueueSize { + /** + * Resolve the queue size. + */ public function __invoke(): Collection { return collect(Config::get('pulse.queues'))->map(fn ($queue) => [ diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php index 22cedd8b..871cdda0 100644 --- a/src/Checks/SystemStats.php +++ b/src/Checks/SystemStats.php @@ -7,6 +7,9 @@ class SystemStats { + /** + * Resolve the systems stats. + */ public function __invoke(): array { return [ diff --git a/src/Entries/Entry.php b/src/Entries/Entry.php index 6d1d668d..abd037ba 100644 --- a/src/Entries/Entry.php +++ b/src/Entries/Entry.php @@ -6,6 +6,8 @@ class Entry { /** * Create a new Entry instance. + * + * @param array $attributes */ public function __construct(public string $table, public array $attributes) { diff --git a/src/Entries/Type.php b/src/Entries/Type.php index 75bc5aa8..9fe8ed0f 100644 --- a/src/Entries/Type.php +++ b/src/Entries/Type.php @@ -17,6 +17,8 @@ enum Type: string /** * Get all cases as a Collection. + * + * @return \Illuminate\Support\Collection */ public static function all(): Collection { diff --git a/src/Handlers/HandleOutgoingRequest.php b/src/Handlers/HandleOutgoingRequest.php index 2c17b89b..6f4647f3 100644 --- a/src/Handlers/HandleOutgoingRequest.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -3,6 +3,7 @@ namespace Laravel\Pulse\Handlers; use Carbon\CarbonImmutable; +use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Promise\RejectedPromise; use Illuminate\Support\Facades\Auth; use Illuminate\Support\Str; @@ -14,6 +15,8 @@ class HandleOutgoingRequest { /** * Invoke the middleware. + * + * @param (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) $handler */ public function __invoke(callable $handler) { diff --git a/src/Http/Livewire/Usage.php b/src/Http/Livewire/Usage.php index 33dc744c..196f6308 100644 --- a/src/Http/Livewire/Usage.php +++ b/src/Http/Livewire/Usage.php @@ -102,7 +102,7 @@ protected function userRequestCounts(): array 'name' => $user['name'], // "extra" rather than 'email' // avatar for pretty-ness? - 'email' => $user['email'], + 'email' => $user['email'] ?? null, ], ] : null; }) diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 8ab7a689..39314aee 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -19,6 +19,8 @@ class Redis implements Ingest /** * Create a new Redis Ingest instance. + * + * @param array $config */ public function __construct(protected array $config, protected RedisConnection $connection) { @@ -27,6 +29,9 @@ public function __construct(protected array $config, protected RedisConnection $ /** * Ingest the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ public function ingest(Collection $entries, Collection $updates): void { diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php index 3e79f9d0..7d1d9ae8 100644 --- a/src/Ingests/Storage.php +++ b/src/Ingests/Storage.php @@ -10,6 +10,9 @@ class Storage implements Ingest { + /** + * Create a new Storage Ingest instance. + */ public function __construct(protected StorageContract $storage) { // @@ -17,6 +20,9 @@ public function __construct(protected StorageContract $storage) /** * Ingest the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ public function ingest(Collection $entries, Collection $updates): void { diff --git a/src/Pulse.php b/src/Pulse.php index ae29be2e..0ee41b27 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -46,7 +46,7 @@ class Pulse /** * Users resolver. * - * @var (callable(\Illuminate\Support\Collection): iterable)|null + * @var (callable(\Illuminate\Support\Collection): iterable)|null */ protected $usersResolver; diff --git a/src/Storage/Database.php b/src/Storage/Database.php index c85fbc33..d28f2c83 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -12,6 +12,11 @@ class Database implements Storage { + /** + * Create a new Database Storage instance. + * + * @param array $config + */ public function __construct(protected array $config, protected DatabaseManager $manager) { // @@ -19,6 +24,9 @@ public function __construct(protected array $config, protected DatabaseManager $ /** * Store the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ public function store(Collection $entries, Collection $updates): void { @@ -31,7 +39,7 @@ public function store(Collection $entries, Collection $updates): void ->map(fn ($inserts) => $inserts->pluck('attributes')->all()) ->each($this->connection()->table($table)->insert(...))); - $updates->each(fn ($update) => $update->perform($db)); + $updates->each(fn ($update) => $update->perform($this->connection())); }); } @@ -45,6 +53,9 @@ public function retain(Interval $interval): void ->delete()); } + /** + * Get the database connection. + */ protected function connection(): Connection { return $this->manager->connection($this->config['connection']); From af66aee306cc881b0a593d1b129108efa652aba3 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:01:55 +1000 Subject: [PATCH 10/26] wip --- config/pulse.php | 3 +++ phpstan-baseline.neon | 11 +++++++++++ phpstan.neon.dist | 5 +++-- src/Commands/WorkCommand.php | 2 +- src/Contracts/Ingest.php | 4 ++-- src/Contracts/Storage.php | 4 ++-- src/Handlers/HandleQuery.php | 4 ++-- src/Http/Livewire/Concerns/HasPeriod.php | 4 ++-- src/Ingests/Redis.php | 12 ++++++++++-- src/Ingests/Storage.php | 6 +++--- src/Pulse.php | 3 ++- src/Redis.php | 20 ++------------------ src/Storage/Database.php | 14 +++++++++++--- 13 files changed, 54 insertions(+), 38 deletions(-) create mode 100644 phpstan-baseline.neon diff --git a/config/pulse.php b/config/pulse.php index 7ab81e17..810f13a9 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -1,5 +1,6 @@ [ 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', + 'trim_after' => Interval::days(7), ], ], @@ -32,6 +34,7 @@ 'redis' => [ 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', + 'trim_after' => Interval::days(7), ], ], diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon new file mode 100644 index 00000000..db076b54 --- /dev/null +++ b/phpstan-baseline.neon @@ -0,0 +1,11 @@ +parameters: + ignoreErrors: + - + message: "#^Call to an undefined static method Livewire\\\\Livewire\\:\\:addPersistentMiddleware\\(\\)\\.$#" + count: 1 + path: src/PulseServiceProvider.php + + - + message: "#^Call to an undefined static method Livewire\\\\Livewire\\:\\:listen\\(\\)\\.$#" + count: 1 + path: src/PulseServiceProvider.php diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 70062030..6ab4e4df 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -1,6 +1,7 @@ parameters: paths: - - config - src - level: 0 + level: 6 +includes: + - phpstan-baseline.neon diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index dd585505..0e58f48b 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -45,7 +45,7 @@ public function handle(Ingest $ingest, Storage $storage): int } if ($now->subMinute()->greaterThan($lastTrimmedStorageAt)) { - $storage->retain(Interval::week()); + $storage->trim(); $lastTrimmedStorageAt = $now; diff --git a/src/Contracts/Ingest.php b/src/Contracts/Ingest.php index e6108d9d..9e5fb450 100644 --- a/src/Contracts/Ingest.php +++ b/src/Contracts/Ingest.php @@ -16,9 +16,9 @@ interface Ingest public function ingest(Collection $entries, Collection $updates): void; /** - * Retain the ingested entries only for the given interval. + * Trim the ingested entries. */ - public function retain(Interval $interval): void; + public function trim(): void; /** * Store the ingested entries. diff --git a/src/Contracts/Storage.php b/src/Contracts/Storage.php index a1316cc5..a5d7f530 100644 --- a/src/Contracts/Storage.php +++ b/src/Contracts/Storage.php @@ -16,7 +16,7 @@ interface Storage public function store(Collection $entries, Collection $updates): void; /** - * Retain the stored entries only for the given interval. + * Trim the stored entries. */ - public function retain(Interval $interval): void; + public function trim(): void; } diff --git a/src/Handlers/HandleQuery.php b/src/Handlers/HandleQuery.php index 8181d15a..8a7432b6 100644 --- a/src/Handlers/HandleQuery.php +++ b/src/Handlers/HandleQuery.php @@ -24,10 +24,10 @@ public function __invoke(QueryExecuted $event): void } Pulse::record(new Entry('pulse_queries', [ - 'date' => $now->subMilliseconds(round($event->time))->toDateTimeString(), + 'date' => $now->subMilliseconds((int) $event->time)->toDateTimeString(), 'user_id' => Auth::id(), 'sql' => $event->sql, - 'duration' => round($event->time), + 'duration' => (int) $event->time, ])); }); } diff --git a/src/Http/Livewire/Concerns/HasPeriod.php b/src/Http/Livewire/Concerns/HasPeriod.php index d984b51c..c419e851 100644 --- a/src/Http/Livewire/Concerns/HasPeriod.php +++ b/src/Http/Livewire/Concerns/HasPeriod.php @@ -11,7 +11,7 @@ trait HasPeriod /** * The usage period. * - * @var '1_hour'|6_hours'|'24_hours'|'7_days'|null + * @var '1_hour'|'6_hours'|'24_hours'|'7_days'|null */ #[Url] public ?string $period = '1_hour'; @@ -19,7 +19,7 @@ trait HasPeriod /** * Handle the period-changed event. * - * @param '1_hour'|6_hours'|'24_hours'|'7_days' $period + * @param '1_hour'|'6_hours'|'24_hours'|'7_days' $period */ #[On('period-changed')] public function updatePeriod(string $period): void diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 39314aee..7ca1dfca 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -56,9 +56,17 @@ public function ingest(Collection $entries, Collection $updates): void /** * Trim the ingested entries. */ - public function retain(Interval $interval): void + public function trim(): void { - $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($interval->copy()->invert())); + $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($this->trimAfter()->invert())); + } + + /** + * The interval to trim the storage to. + */ + protected function trimAfter(): Interval + { + return new Interval($this->config['trim_after'] ?? 'P7D'); } /** diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php index 7d1d9ae8..85a9255b 100644 --- a/src/Ingests/Storage.php +++ b/src/Ingests/Storage.php @@ -30,11 +30,11 @@ public function ingest(Collection $entries, Collection $updates): void } /** - * Retain the ingested entries only for the given interval. + * Trim the ingested entries. */ - public function retain(Interval $interval): void + public function trim(): void { - $this->storage->retain($interval); + $this->storage->trim(); } /** diff --git a/src/Pulse.php b/src/Pulse.php index 0ee41b27..56dcb7e8 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -3,6 +3,7 @@ namespace Laravel\Pulse; use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; use Closure; use Illuminate\Http\Request; use Illuminate\Support\Collection; @@ -143,7 +144,7 @@ public function store(): self // TODO: lottery configuration? $this->rescue(fn () => Lottery::odds(1, 100) - ->winner(fn () => $this->ingest->retain((new CarbonImmutable)->subWeek())) + ->winner(fn () => $this->ingest->trim()) ->choose()); return $this->clearQueue(); diff --git a/src/Redis.php b/src/Redis.php index a3efa270..d8e34ae7 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -35,7 +35,7 @@ public function __construct(protected array $config, protected ?RedisManager $ma */ public function xadd($key, $dictionary) { - if ($this->isPhpRedis()) { + if ($this->client() instanceof PhpRedis) { return $this->client()->xAdd($key, '*', $dictionary); } @@ -55,7 +55,7 @@ public function xrange($key, $start, $end, $count = null) */ public function xtrim($key, $strategy, $strategyModifier, $threshold) { - if ($this->isPhpRedis()) { + if ($this->client() instanceof PhpRedis) { // PHP Redis does not support the minid strategy. return $this->client()->rawCommand('XTRIM', $this->config['prefix'].$key, $strategy, $strategyModifier, $threshold); } @@ -87,22 +87,6 @@ public function streamIdAt(Interval $interval): string return (string) ($redisTimestamp + $interval->totalMilliseconds); } - /** - * Determine if the client is PhpRedis. - */ - protected function isPhpRedis(): bool - { - return $this->client() instanceof PhpRedis; - } - - /** - * Determine if the client is Predis. - */ - protected function isPredis(): bool - { - return $this->client() instanceof Predis; - } - /** * The connections client. */ diff --git a/src/Storage/Database.php b/src/Storage/Database.php index d28f2c83..969b914a 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -44,15 +44,23 @@ public function store(Collection $entries, Collection $updates): void } /** - * Retain the ingested entries only for the given interval. + * Trim the stored entries. */ - public function retain(Interval $interval): void + public function trim(): void { Type::all()->each(fn (Type $type) => $this->connection()->table($type->value) - ->where('date', '<', (new CarbonImmutable)->subSeconds($interval->seconds)->toDateTimeString()) + ->where('date', '<', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->toDateTimeString()) ->delete()); } + /** + * The interval to trim the storage to. + */ + protected function trimAfter(): Interval + { + return new Interval($this->config['trim_after'] ?? 'P7D'); + } + /** * Get the database connection. */ From 6bc314789391fa4f7b1bad3d9f06ea50b6b74e14 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:16:38 +1000 Subject: [PATCH 11/26] wip --- phpstan.neon.dist | 2 +- src/Checks/QueueSize.php | 2 ++ src/Checks/SystemStats.php | 9 ++++++--- src/Commands/CheckCommand.php | 2 +- src/Handlers/HandleOutgoingRequest.php | 6 ++++-- src/Http/Livewire/Cache.php | 2 +- src/Http/Livewire/Exceptions.php | 2 +- src/Http/Livewire/Queues.php | 2 +- src/Http/Livewire/Servers.php | 2 +- src/Http/Livewire/SlowJobs.php | 2 +- src/Http/Livewire/SlowOutgoingRequests.php | 2 +- src/Http/Livewire/SlowQueries.php | 2 +- src/Http/Livewire/SlowRoutes.php | 2 +- src/Http/Livewire/Usage.php | 2 +- src/ListensForStorageOpportunities.php | 4 ++-- 15 files changed, 25 insertions(+), 18 deletions(-) diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 6ab4e4df..a454cbf3 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -2,6 +2,6 @@ parameters: paths: - src - level: 6 + level: 7 includes: - phpstan-baseline.neon diff --git a/src/Checks/QueueSize.php b/src/Checks/QueueSize.php index 96ed4fa7..fe82a6b4 100644 --- a/src/Checks/QueueSize.php +++ b/src/Checks/QueueSize.php @@ -10,6 +10,8 @@ class QueueSize { /** * Resolve the queue size. + * + * @return \Illuminate\Support\Collection> */ public function __invoke(): Collection { diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php index 871cdda0..3ca8fa91 100644 --- a/src/Checks/SystemStats.php +++ b/src/Checks/SystemStats.php @@ -2,6 +2,7 @@ namespace Laravel\Pulse\Checks; +use Illuminate\Support\Collection; use Illuminate\Support\Facades\Config; use RuntimeException; @@ -9,10 +10,12 @@ class SystemStats { /** * Resolve the systems stats. + * + * @return \Illuminate\Support\Collection */ - public function __invoke(): array + public function __invoke(): Collection { - return [ + return collect([ 'server' => Config::get('pulse.server_name'), ...match (PHP_OS_FAMILY) { 'Darwin' => [ @@ -32,6 +35,6 @@ public function __invoke(): array 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB ])->toJson(), - ]; + ]); } } diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 179c10d2..bafb5055 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -70,7 +70,7 @@ public function handle( ...$stats = $systemStats(), ])); - $this->line('[system stats] '.json_encode($stats, flags: JSON_THROW_ON_ERROR)); + $this->line('[system stats] '.$stats->toJson()); /* * Collect queue sizes. diff --git a/src/Handlers/HandleOutgoingRequest.php b/src/Handlers/HandleOutgoingRequest.php index 6f4647f3..8d308253 100644 --- a/src/Handlers/HandleOutgoingRequest.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -3,6 +3,7 @@ namespace Laravel\Pulse\Handlers; use Carbon\CarbonImmutable; +use Closure; use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Promise\RejectedPromise; use Illuminate\Support\Facades\Auth; @@ -16,9 +17,10 @@ class HandleOutgoingRequest /** * Invoke the middleware. * - * @param (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) $handler + * @param (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) $handler + * @return (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) */ - public function __invoke(callable $handler) + public function __invoke(callable $handler): callable { return function ($request, $options) use ($handler) { $startedAt = new CarbonImmutable; diff --git a/src/Http/Livewire/Cache.php b/src/Http/Livewire/Cache.php index c6129a33..aadcf624 100644 --- a/src/Http/Livewire/Cache.php +++ b/src/Http/Livewire/Cache.php @@ -39,7 +39,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Exceptions.php b/src/Http/Livewire/Exceptions.php index dc4e3103..d522a7e2 100644 --- a/src/Http/Livewire/Exceptions.php +++ b/src/Http/Livewire/Exceptions.php @@ -42,7 +42,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Queues.php b/src/Http/Livewire/Queues.php index b7cc350e..b7dca68b 100644 --- a/src/Http/Livewire/Queues.php +++ b/src/Http/Livewire/Queues.php @@ -26,7 +26,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Servers.php b/src/Http/Livewire/Servers.php index f48e444a..83f9f9a2 100644 --- a/src/Http/Livewire/Servers.php +++ b/src/Http/Livewire/Servers.php @@ -39,7 +39,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-6']); } diff --git a/src/Http/Livewire/SlowJobs.php b/src/Http/Livewire/SlowJobs.php index 9932c21c..c4d01949 100644 --- a/src/Http/Livewire/SlowJobs.php +++ b/src/Http/Livewire/SlowJobs.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowOutgoingRequests.php b/src/Http/Livewire/SlowOutgoingRequests.php index 0fb383d4..2ca9f853 100644 --- a/src/Http/Livewire/SlowOutgoingRequests.php +++ b/src/Http/Livewire/SlowOutgoingRequests.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowQueries.php b/src/Http/Livewire/SlowQueries.php index 6f2faef8..229e3de3 100644 --- a/src/Http/Livewire/SlowQueries.php +++ b/src/Http/Livewire/SlowQueries.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowRoutes.php b/src/Http/Livewire/SlowRoutes.php index 7d8443fc..319b1278 100644 --- a/src/Http/Livewire/SlowRoutes.php +++ b/src/Http/Livewire/SlowRoutes.php @@ -34,7 +34,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Usage.php b/src/Http/Livewire/Usage.php index 196f6308..40665672 100644 --- a/src/Http/Livewire/Usage.php +++ b/src/Http/Livewire/Usage.php @@ -60,7 +60,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/ListensForStorageOpportunities.php b/src/ListensForStorageOpportunities.php index 21f49080..f98b63e6 100644 --- a/src/ListensForStorageOpportunities.php +++ b/src/ListensForStorageOpportunities.php @@ -13,9 +13,9 @@ trait ListensForStorageOpportunities /** * An array indicating how many jobs are processing. * - * @var array + * @var array */ - protected static $processingJobs = []; + protected static array $processingJobs = []; /** * Register listeners that store the recorded Telescope entries. From 5fc06eaedc55af21d6c7b9083b51c4e3a7c00e87 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:17:04 +1000 Subject: [PATCH 12/26] wip --- src/Commands/WorkCommand.php | 1 - src/Contracts/Ingest.php | 1 - src/Contracts/Storage.php | 1 - src/Handlers/HandleOutgoingRequest.php | 1 - src/Ingests/Storage.php | 1 - src/Pulse.php | 2 -- 6 files changed, 7 deletions(-) diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index 0e58f48b..ca5dda7b 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -3,7 +3,6 @@ namespace Laravel\Pulse\Commands; use Carbon\CarbonImmutable; -use Carbon\CarbonInterval as Interval; use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Sleep; diff --git a/src/Contracts/Ingest.php b/src/Contracts/Ingest.php index 9e5fb450..779a933d 100644 --- a/src/Contracts/Ingest.php +++ b/src/Contracts/Ingest.php @@ -2,7 +2,6 @@ namespace Laravel\Pulse\Contracts; -use Carbon\CarbonInterval as Interval; use Illuminate\Support\Collection; interface Ingest diff --git a/src/Contracts/Storage.php b/src/Contracts/Storage.php index a5d7f530..02230318 100644 --- a/src/Contracts/Storage.php +++ b/src/Contracts/Storage.php @@ -2,7 +2,6 @@ namespace Laravel\Pulse\Contracts; -use Carbon\CarbonInterval as Interval; use Illuminate\Support\Collection; interface Storage diff --git a/src/Handlers/HandleOutgoingRequest.php b/src/Handlers/HandleOutgoingRequest.php index 8d308253..332adeb4 100644 --- a/src/Handlers/HandleOutgoingRequest.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -3,7 +3,6 @@ namespace Laravel\Pulse\Handlers; use Carbon\CarbonImmutable; -use Closure; use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Promise\RejectedPromise; use Illuminate\Support\Facades\Auth; diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php index 85a9255b..31f507cd 100644 --- a/src/Ingests/Storage.php +++ b/src/Ingests/Storage.php @@ -2,7 +2,6 @@ namespace Laravel\Pulse\Ingests; -use Carbon\CarbonInterval as Interval; use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage as StorageContract; diff --git a/src/Pulse.php b/src/Pulse.php index 56dcb7e8..97960474 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -2,8 +2,6 @@ namespace Laravel\Pulse; -use Carbon\CarbonImmutable; -use Carbon\CarbonInterval as Interval; use Closure; use Illuminate\Http\Request; use Illuminate\Support\Collection; From 2f9131304647b4367d139f55fc20b6365b179159 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:24:01 +1000 Subject: [PATCH 13/26] wip --- phpstan.neon.dist | 2 +- src/Pulse.php | 2 +- src/Redis.php | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/phpstan.neon.dist b/phpstan.neon.dist index a454cbf3..9097fb79 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -2,6 +2,6 @@ parameters: paths: - src - level: 7 + level: 8 includes: - phpstan-baseline.neon diff --git a/src/Pulse.php b/src/Pulse.php index 97960474..0e1ba239 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -262,7 +262,7 @@ public function rescue(callable $callback): void try { $callback(); } catch (Throwable $e) { - ($this->handleExceptionsUsing)($e); + ($this->handleExceptionsUsing ?? fn () => null)($e); } } diff --git a/src/Redis.php b/src/Redis.php index d8e34ae7..943fb3e1 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -21,6 +21,7 @@ class Redis /** * Create a new Redis instance. * + * @param array{connection: string, prefix: string} $config * @param \Redis|\Predis\Client|\Predis\Pipeline\Pipeline|null $client */ public function __construct(protected array $config, protected ?RedisManager $manager = null, protected $client = null) @@ -82,7 +83,7 @@ public function streamIdAt(Interval $interval): string { $redisTime = $this->client()->time(); - $redisTimestamp = $redisTime[0].substr($redisTime[1], 0, 3); + $redisTimestamp = (int) ($redisTime[0].substr($redisTime[1], 0, 3)); return (string) ($redisTimestamp + $interval->totalMilliseconds); } From 5675ae69a1c887cf35c3b7e8b6dd632f54ddccd1 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:31:54 +1000 Subject: [PATCH 14/26] wip --- config/pulse.php | 3 +++ src/Ingests/Redis.php | 3 ++- src/Redis.php | 12 ------------ 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/config/pulse.php b/config/pulse.php index 810f13a9..25f653cb 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -23,6 +23,9 @@ 'database' => [ 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', + // TODO can we just use this instead of caring about Redis time? + // Then we can just use our time and this can be tweaked if needed + // to adjust for out of time sync issues. 'trim_after' => Interval::days(7), ], ], diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 7ca1dfca..c152c2d1 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -2,6 +2,7 @@ namespace Laravel\Pulse\Ingests; +use Carbon\CarbonImmutable; use Carbon\CarbonInterval as Interval; use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Ingest; @@ -58,7 +59,7 @@ public function ingest(Collection $entries, Collection $updates): void */ public function trim(): void { - $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($this->trimAfter()->invert())); + $this->connection->xtrim($this->stream, 'MINID', '~', (new CarbonImmutable)->subSeconds($this->trimAfter()->totalSeconds)->getTimestampMs()); } /** diff --git a/src/Redis.php b/src/Redis.php index 943fb3e1..be33eb23 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -76,18 +76,6 @@ public function pipeline(callable $closure): array }); } - /** - * Get the ID of the stream at a given time. - */ - public function streamIdAt(Interval $interval): string - { - $redisTime = $this->client()->time(); - - $redisTimestamp = (int) ($redisTime[0].substr($redisTime[1], 0, 3)); - - return (string) ($redisTimestamp + $interval->totalMilliseconds); - } - /** * The connections client. */ From 42d8f3c0d56c42de4b9746970c3e1928e7f9dfc3 Mon Sep 17 00:00:00 2001 From: timacdonald Date: Tue, 15 Aug 2023 03:32:23 +0000 Subject: [PATCH 15/26] Fix code styling --- src/Redis.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Redis.php b/src/Redis.php index be33eb23..84daa607 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -2,7 +2,6 @@ namespace Laravel\Pulse; -use Carbon\CarbonInterval as Interval; use Illuminate\Redis\Connections\Connection; use Illuminate\Redis\RedisManager; use Predis\Client as Predis; From b37704a32fbf020c329649948ceb4ae3f3968b6a Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:46:13 +1000 Subject: [PATCH 16/26] wip --- phpstan-baseline.neon | 11 ----------- phpstan.neon.dist | 2 -- 2 files changed, 13 deletions(-) delete mode 100644 phpstan-baseline.neon diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon deleted file mode 100644 index db076b54..00000000 --- a/phpstan-baseline.neon +++ /dev/null @@ -1,11 +0,0 @@ -parameters: - ignoreErrors: - - - message: "#^Call to an undefined static method Livewire\\\\Livewire\\:\\:addPersistentMiddleware\\(\\)\\.$#" - count: 1 - path: src/PulseServiceProvider.php - - - - message: "#^Call to an undefined static method Livewire\\\\Livewire\\:\\:listen\\(\\)\\.$#" - count: 1 - path: src/PulseServiceProvider.php diff --git a/phpstan.neon.dist b/phpstan.neon.dist index 9097fb79..099b4941 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -3,5 +3,3 @@ parameters: - src level: 8 -includes: - - phpstan-baseline.neon From ee3d7c9d943cb643223671b39a744a243200157a Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 13:46:59 +1000 Subject: [PATCH 17/26] wip --- config/pulse.php | 4 ++-- src/Ingests/Redis.php | 2 +- src/Storage/Database.php | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/pulse.php b/config/pulse.php index 25f653cb..b9399af7 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -26,7 +26,7 @@ // TODO can we just use this instead of caring about Redis time? // Then we can just use our time and this can be tweaked if needed // to adjust for out of time sync issues. - 'trim_after' => Interval::days(7), + 'retain' => Interval::days(7), ], ], @@ -37,7 +37,7 @@ 'redis' => [ 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', - 'trim_after' => Interval::days(7), + 'retain' => Interval::days(7), ], ], diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index c152c2d1..f3d55ab6 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -67,7 +67,7 @@ public function trim(): void */ protected function trimAfter(): Interval { - return new Interval($this->config['trim_after'] ?? 'P7D'); + return new Interval($this->config['retain'] ?? 'P7D'); } /** diff --git a/src/Storage/Database.php b/src/Storage/Database.php index 969b914a..641d140d 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -58,7 +58,7 @@ public function trim(): void */ protected function trimAfter(): Interval { - return new Interval($this->config['trim_after'] ?? 'P7D'); + return new Interval($this->config['retain'] ?? 'P7D'); } /** From c85907ab4b6bc04d46323a543a5c84c1e08a8b08 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 14:57:41 +1000 Subject: [PATCH 18/26] wip --- src/Commands/CheckCommand.php | 5 +++-- src/Entries/Entry.php | 20 ++------------------ src/Entries/JobFinished.php | 6 +++--- src/Entries/JobStarted.php | 6 +++--- src/Entries/{Type.php => Table.php} | 2 +- src/Entries/Update.php | 18 +----------------- src/Handlers/HandleCacheInteraction.php | 3 ++- src/Handlers/HandleException.php | 3 ++- src/Handlers/HandleHttpRequest.php | 3 ++- src/Handlers/HandleOutgoingRequest.php | 3 ++- src/Handlers/HandleQuery.php | 3 ++- src/Handlers/HandleQueuedJob.php | 3 ++- src/Http/Livewire/PeriodSelector.php | 2 ++ src/Ingests/Redis.php | 6 +++--- src/Pulse.php | 7 ++++++- src/Storage/Database.php | 17 ++++++++++------- 16 files changed, 46 insertions(+), 61 deletions(-) rename src/Entries/{Type.php => Table.php} (96%) diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index bafb5055..4204c9f0 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -10,6 +10,7 @@ use Laravel\Pulse\Checks\QueueSize; use Laravel\Pulse\Checks\SystemStats; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Symfony\Component\Console\Attribute\AsCommand; @@ -65,7 +66,7 @@ public function handle( * Collect server stats. */ - Pulse::record(new Entry('pulse_servers', [ + Pulse::record(new Entry(Table::Server, [ 'date' => $lastSnapshotAt->toDateTimeString(), ...$stats = $systemStats(), ])); @@ -77,7 +78,7 @@ public function handle( */ if (Cache::lock("illuminate:pulse:check-queue-sizes:{$lastSnapshotAt->timestamp}", $this->interval)->get()) { - $sizes = $queueSize()->each(fn ($queue) => Pulse::record(new Entry('pulse_queue_sizes', [ + $sizes = $queueSize()->each(fn ($queue) => Pulse::record(new Entry(Table::QueueSize, [ 'date' => $lastSnapshotAt->toDateTimeString(), ...$queue, ]))); diff --git a/src/Entries/Entry.php b/src/Entries/Entry.php index abd037ba..94d78958 100644 --- a/src/Entries/Entry.php +++ b/src/Entries/Entry.php @@ -9,7 +9,7 @@ class Entry * * @param array $attributes */ - public function __construct(public string $table, public array $attributes) + public function __construct(public Table $table, public array $attributes) { // } @@ -17,24 +17,8 @@ public function __construct(public string $table, public array $attributes) /** * The entries table. */ - public function table(): string + public function table(): Table { return $this->table; } - - /** - * Determine if the update is the given type. - */ - public function is(Type $type): bool - { - return $this->type() === $type; - } - - /** - * The type of update. - */ - public function type(): Type - { - return Type::from($this->table()); - } } diff --git a/src/Entries/JobFinished.php b/src/Entries/JobFinished.php index e1fdea41..448f117c 100644 --- a/src/Entries/JobFinished.php +++ b/src/Entries/JobFinished.php @@ -22,7 +22,7 @@ public function __construct( */ public function perform(Connection $db): void { - $db->table($this->table()) + $db->table($this->table()->value) ->where('job_id', $this->jobId) ->update([ 'duration' => DB::raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), @@ -32,8 +32,8 @@ public function perform(Connection $db): void /** * The update's table. */ - public function table(): string + public function table(): Table { - return 'pulse_jobs'; + return Table::Job; } } diff --git a/src/Entries/JobStarted.php b/src/Entries/JobStarted.php index be3675b6..edadbfa0 100644 --- a/src/Entries/JobStarted.php +++ b/src/Entries/JobStarted.php @@ -21,7 +21,7 @@ public function __construct( */ public function perform(Connection $db): void { - $db->table($this->table()) + $db->table($this->table()->value) ->where('job_id', $this->jobId) ->update([ 'processing_started_at' => $this->startedAt, @@ -31,8 +31,8 @@ public function perform(Connection $db): void /** * The update's table. */ - public function table(): string + public function table(): Table { - return 'pulse_jobs'; + return Table::Job; } } diff --git a/src/Entries/Type.php b/src/Entries/Table.php similarity index 96% rename from src/Entries/Type.php rename to src/Entries/Table.php index 9fe8ed0f..33784990 100644 --- a/src/Entries/Type.php +++ b/src/Entries/Table.php @@ -4,7 +4,7 @@ use Illuminate\Support\Collection; -enum Type: string +enum Table: string { case CacheHit = 'pulse_cache_hits'; case Exception = 'pulse_exceptions'; diff --git a/src/Entries/Update.php b/src/Entries/Update.php index a35b6d22..be67e28e 100644 --- a/src/Entries/Update.php +++ b/src/Entries/Update.php @@ -9,26 +9,10 @@ abstract class Update /** * The update's table. */ - abstract public function table(): string; + abstract public function table(): Table; /** * Perform the update. */ abstract public function perform(Connection $db): void; - - /** - * Determine if the update is the given type. - */ - public function is(Type $type): bool - { - return $this->type() === $type; - } - - /** - * The type of update. - */ - public function type(): Type - { - return Type::from($this->table()); - } } diff --git a/src/Handlers/HandleCacheInteraction.php b/src/Handlers/HandleCacheInteraction.php index 4671786c..f7d0159e 100644 --- a/src/Handlers/HandleCacheInteraction.php +++ b/src/Handlers/HandleCacheInteraction.php @@ -7,6 +7,7 @@ use Illuminate\Cache\Events\CacheMissed; use Illuminate\Support\Facades\Auth; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleCacheInteraction @@ -23,7 +24,7 @@ public function __invoke(CacheHit|CacheMissed $event): void return; } - Pulse::record(new Entry('pulse_cache_hits', [ + Pulse::record(new Entry(Table::CacheHit, [ 'date' => $now->toDateTimeString(), 'hit' => $event instanceof CacheHit, 'key' => $event->key, diff --git a/src/Handlers/HandleException.php b/src/Handlers/HandleException.php index 94afade6..800c4620 100644 --- a/src/Handlers/HandleException.php +++ b/src/Handlers/HandleException.php @@ -6,6 +6,7 @@ use Illuminate\Support\Facades\Auth; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Throwable; @@ -19,7 +20,7 @@ public function __invoke(Throwable $e): void Pulse::rescue(function () use ($e) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_exceptions', [ + Pulse::record(new Entry(Table::Exception, [ 'date' => $now->toDateTimeString(), 'user_id' => Auth::id(), 'class' => $e::class, diff --git a/src/Handlers/HandleHttpRequest.php b/src/Handlers/HandleHttpRequest.php index b010fcd2..a88d372d 100644 --- a/src/Handlers/HandleHttpRequest.php +++ b/src/Handlers/HandleHttpRequest.php @@ -7,6 +7,7 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Symfony\Component\HttpFoundation\Response; @@ -20,7 +21,7 @@ public function __invoke(Carbon $startedAt, Request $request, Response $response Pulse::rescue(function () use ($startedAt, $request) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_requests', [ + Pulse::record(new Entry(Table::Request, [ 'date' => $startedAt->toDateTimeString(), 'user_id' => $request->user()?->id, 'route' => $request->method().' '.Str::start(($request->route()?->uri() ?? $request->path()), '/'), diff --git a/src/Handlers/HandleOutgoingRequest.php b/src/Handlers/HandleOutgoingRequest.php index 332adeb4..4cd62819 100644 --- a/src/Handlers/HandleOutgoingRequest.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -8,6 +8,7 @@ use Illuminate\Support\Facades\Auth; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Psr\Http\Message\RequestInterface; @@ -41,7 +42,7 @@ public function __invoke(callable $handler): callable */ protected function record(RequestInterface $request, CarbonImmutable $startedAt, CarbonImmutable $endedAt): void { - Pulse::record(new Entry('pulse_outgoing_requests', [ + Pulse::record(new Entry(Table::OutgoingRequest, [ 'uri' => $request->getMethod().' '.Str::before($request->getUri(), '?'), 'date' => $startedAt->toDateTimeString(), 'duration' => $startedAt->diffInMilliseconds($endedAt), diff --git a/src/Handlers/HandleQuery.php b/src/Handlers/HandleQuery.php index 8a7432b6..2e864716 100644 --- a/src/Handlers/HandleQuery.php +++ b/src/Handlers/HandleQuery.php @@ -7,6 +7,7 @@ use Illuminate\Support\Facades\Auth; use Illuminate\Support\Facades\Config; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleQuery @@ -23,7 +24,7 @@ public function __invoke(QueryExecuted $event): void return; } - Pulse::record(new Entry('pulse_queries', [ + Pulse::record(new Entry(Table::Query, [ 'date' => $now->subMilliseconds((int) $event->time)->toDateTimeString(), 'user_id' => Auth::id(), 'sql' => $event->sql, diff --git a/src/Handlers/HandleQueuedJob.php b/src/Handlers/HandleQueuedJob.php index 3d4acd68..f64763b2 100644 --- a/src/Handlers/HandleQueuedJob.php +++ b/src/Handlers/HandleQueuedJob.php @@ -6,6 +6,7 @@ use Illuminate\Queue\Events\JobQueued; use Illuminate\Support\Facades\Auth; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleQueuedJob @@ -18,7 +19,7 @@ public function __invoke(JobQueued $event): void Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_jobs', [ + Pulse::record(new Entry(Table::Job, [ 'date' => $now->toDateTimeString(), 'user_id' => Auth::id(), 'job' => is_string($event->job) diff --git a/src/Http/Livewire/PeriodSelector.php b/src/Http/Livewire/PeriodSelector.php index 96502d6e..c3b6e4fc 100644 --- a/src/Http/Livewire/PeriodSelector.php +++ b/src/Http/Livewire/PeriodSelector.php @@ -27,6 +27,8 @@ public function render(): Renderable /** * Set the selected period. + * + * @param '1_hour'|'6_hours'|'24_hours'|'7_days' $period */ public function setPeriod(string $period): void { diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index f3d55ab6..1ed70aba 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -40,8 +40,8 @@ public function ingest(Collection $entries, Collection $updates): void return; } - $this->connection->pipeline(function (RedisConnection $pipeline) use ($entries, $updates) { - $entries->groupBy('table') + $this->connection->pipeline(function ($pipeline) use ($entries, $updates) { + $entries->groupBy('table.value') ->each(fn ($rows, $table) => $rows->each(fn ($data) => $pipeline->xadd($this->stream, [ 'type' => $table, 'data' => json_encode($data, flags: JSON_THROW_ON_ERROR), @@ -59,7 +59,7 @@ public function ingest(Collection $entries, Collection $updates): void */ public function trim(): void { - $this->connection->xtrim($this->stream, 'MINID', '~', (new CarbonImmutable)->subSeconds($this->trimAfter()->totalSeconds)->getTimestampMs()); + $this->connection->xtrim($this->stream, 'MINID', '~', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->getTimestampMs()); } /** diff --git a/src/Pulse.php b/src/Pulse.php index 0e1ba239..77aa02d6 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -153,11 +153,13 @@ public function store(): self */ protected function shouldRecord(Entry|Update $entry): bool { - return $this->filters->every(fn (callable $filter) => $filter($entry)); + return $this->filters->every(fn ($filter) => $filter($entry)); } /** * Resolve the user's details using the given closure. + * + * @param (callable(\Illuminate\Support\Collection): iterable) $callback */ public function resolveUsersUsing(callable $callback): self { @@ -168,6 +170,9 @@ public function resolveUsersUsing(callable $callback): self /** * Resolve the user's details using the given closure. + * + * @param \Illuminate\Support\Collection $ids + * @return \Illuminate\Support\Collection */ public function resolveUsers(Collection $ids): Collection { diff --git a/src/Storage/Database.php b/src/Storage/Database.php index 641d140d..86867974 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -8,7 +8,7 @@ use Illuminate\Database\DatabaseManager; use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Storage; -use Laravel\Pulse\Entries\Type; +use Laravel\Pulse\Entries\Table; class Database implements Storage { @@ -35,9 +35,10 @@ public function store(Collection $entries, Collection $updates): void } $this->connection()->transaction(function () use ($entries, $updates) { - $entries->groupBy('table')->each(fn ($rows, $table) => $rows->chunk(1000) - ->map(fn ($inserts) => $inserts->pluck('attributes')->all()) - ->each($this->connection()->table($table)->insert(...))); + $entries->groupBy('table.value') + ->each(fn ($rows, $table) => $rows->chunk(1000) + ->map(fn ($inserts) => $inserts->pluck('attributes')->all()) + ->each($this->connection()->table($table)->insert(...))); $updates->each(fn ($update) => $update->perform($this->connection())); }); @@ -48,9 +49,11 @@ public function store(Collection $entries, Collection $updates): void */ public function trim(): void { - Type::all()->each(fn (Type $type) => $this->connection()->table($type->value) - ->where('date', '<', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->toDateTimeString()) - ->delete()); + Table::all() + ->each(fn ($table) => $this->connection() + ->table($table->value) + ->where('date', '<', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->toDateTimeString()) + ->delete()); } /** From ac4a4ed9cd553b35d7eb03ea0a908eaf29161e6c Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:00:33 +1000 Subject: [PATCH 19/26] wip --- src/Entries/JobFinished.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Entries/JobFinished.php b/src/Entries/JobFinished.php index 448f117c..a2243ce8 100644 --- a/src/Entries/JobFinished.php +++ b/src/Entries/JobFinished.php @@ -25,7 +25,7 @@ public function perform(Connection $db): void $db->table($this->table()->value) ->where('job_id', $this->jobId) ->update([ - 'duration' => DB::raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), + 'duration' => $db->raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), ]); } From b9b97bc31331db64cb7424913d471f877de760a2 Mon Sep 17 00:00:00 2001 From: timacdonald Date: Tue, 15 Aug 2023 05:01:00 +0000 Subject: [PATCH 20/26] Fix code styling --- src/Entries/JobFinished.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Entries/JobFinished.php b/src/Entries/JobFinished.php index a2243ce8..434d1c20 100644 --- a/src/Entries/JobFinished.php +++ b/src/Entries/JobFinished.php @@ -3,7 +3,6 @@ namespace Laravel\Pulse\Entries; use Illuminate\Database\Connection; -use Illuminate\Support\Facades\DB; class JobFinished extends Update { From cac295d3efc34fd4d66bd06cb94d5dd885b60336 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:01:17 +1000 Subject: [PATCH 21/26] wip --- phpstan.neon.dist => _phpstan.neon.dist | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename phpstan.neon.dist => _phpstan.neon.dist (100%) diff --git a/phpstan.neon.dist b/_phpstan.neon.dist similarity index 100% rename from phpstan.neon.dist rename to _phpstan.neon.dist From b581855ae9d9f666153681dfc4c2609155561a69 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:08:06 +1000 Subject: [PATCH 22/26] wip --- src/Ingests/Redis.php | 2 +- src/Storage/Database.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index 1ed70aba..deafd6ff 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -21,7 +21,7 @@ class Redis implements Ingest /** * Create a new Redis Ingest instance. * - * @param array $config + * @param array{retain: string} $config */ public function __construct(protected array $config, protected RedisConnection $connection) { diff --git a/src/Storage/Database.php b/src/Storage/Database.php index 86867974..02efe2e7 100644 --- a/src/Storage/Database.php +++ b/src/Storage/Database.php @@ -15,7 +15,7 @@ class Database implements Storage /** * Create a new Database Storage instance. * - * @param array $config + * @param array{connection: string, retain: \DateInterval} $config */ public function __construct(protected array $config, protected DatabaseManager $manager) { From f23157b18f1101fa55fe901972a0ca0fc9e2dffe Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:41:53 +1000 Subject: [PATCH 23/26] wip --- config/pulse.php | 9 ++------- src/Ingests/Redis.php | 3 ++- src/Pulse.php | 7 ++----- src/PulseServiceProvider.php | 2 +- 4 files changed, 7 insertions(+), 14 deletions(-) diff --git a/config/pulse.php b/config/pulse.php index b9399af7..b4283d3f 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -23,17 +23,14 @@ 'database' => [ 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', - // TODO can we just use this instead of caring about Redis time? - // Then we can just use our time and this can be tweaked if needed - // to adjust for out of time sync issues. 'retain' => Interval::days(7), ], ], 'ingest' => [ - 'driver' => env('PULSE_INGEST_DRIVER', 'redis'), + 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), - 'storage' => [], + 'lottery' => [1, 100], 'redis' => [ 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', @@ -42,8 +39,6 @@ ], // TODO: filter configuration? - // TODO: trim lottery configuration - // TODO: configure days of data to store? default: 7 // in milliseconds 'slow_endpoint_threshold' => 1000, diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index deafd6ff..a5e41810 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -8,6 +8,7 @@ use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Entries\Update; use Laravel\Pulse\Redis as RedisConnection; @@ -88,7 +89,7 @@ public function store(Storage $storage, int $count): int ->partition(fn ($entry) => $entry['type'] !== 'pulse_update'); $inserts = $inserts->map(fn ($data) => with(json_decode($data['data'], true, flags: JSON_THROW_ON_ERROR), function ($data) { - return new Entry($data['table'], $data['attributes']); + return new Entry(Table::from($data['table']), $data['attributes']); })); $updates = $updates->map(fn ($data): Update => unserialize($data['data'])); diff --git a/src/Pulse.php b/src/Pulse.php index 77aa02d6..cc9b0c5d 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -71,7 +71,7 @@ class Pulse /** * Create a new Pulse instance. */ - public function __construct(protected Ingest $ingest) + public function __construct(protected array $config, protected Ingest $ingest) { $this->filters = collect([]); @@ -133,15 +133,12 @@ public function store(): self return $this->clearQueue(); } - // TODO: logging? report? We should have a "handlePulseExceptionsUsing" - // and allow user-land control, but by default we just ignore. $this->rescue(fn () => $this->ingest->ingest( $this->entriesQueue->filter($this->shouldRecord(...)), $this->updatesQueue->filter($this->shouldRecord(...)), )); - // TODO: lottery configuration? - $this->rescue(fn () => Lottery::odds(1, 100) + $this->rescue(fn () => Lottery::odds(...$this->config['ingest']['lottery']) ->winner(fn () => $this->ingest->trim()) ->choose()); diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index 055f0fad..bc462b6a 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -60,7 +60,7 @@ public function register(): void return; } - $this->app->singleton(Pulse::class); + $this->app->singleton(Pulse::class, fn ($app) => new Pulse(Config::get('pulse'), $app[Ingest::class])); $this->app->singleton(Storage::class, function ($app) { $driver = Config::get('pulse.storage.driver'); From b1f62f24afb3065fde692cb1bc2db45923d8dab8 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:51:46 +1000 Subject: [PATCH 24/26] wip --- config/pulse.php | 8 +++++--- src/PulseServiceProvider.php | 11 +++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/config/pulse.php b/config/pulse.php index b4283d3f..e7cb4d49 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -21,20 +21,22 @@ 'storage' => [ 'driver' => env('PULSE_STORAGE_DRIVER', 'database'), + 'retain' => Interval::days(7), + 'database' => [ 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', - 'retain' => Interval::days(7), ], ], 'ingest' => [ - 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + 'driver' => env('PULSE_INGEST_DRIVER', 'redis'), + + 'retain' => Interval::days(7), 'lottery' => [1, 100], 'redis' => [ 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', - 'retain' => Interval::days(7), ], ], diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index bc462b6a..39cdb402 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -12,6 +12,7 @@ use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Queue\Events\JobQueued; +use Illuminate\Support\Arr; use Illuminate\Support\Facades\Blade; use Illuminate\Support\Facades\Config; use Illuminate\Support\Facades\Event; @@ -65,7 +66,10 @@ public function register(): void $this->app->singleton(Storage::class, function ($app) { $driver = Config::get('pulse.storage.driver'); - $config = Config::get("pulse.storage.{$driver}"); + $config = [ + ...Arr::only(Config::get('pulse.storage'), ['retain']), + ...Config::get("pulse.storage.{$driver}"), + ]; return new Database($config, $app['db']); }); @@ -77,7 +81,10 @@ public function register(): void return $app[StorageIngest::class]; } - $ingestConfig = Config::get("pulse.ingest.{$driver}"); + $ingestConfig = [ + ...Arr::only(Config::get('pulse.ingest'), ['retain', 'lottery']), + ...Config::get("pulse.ingest.{$driver}"), + ]; $redisConfig = [ ...Config::get('database.redis.options'), From ca9d461e73ad65bbd9a07eeb11668d0b998430b2 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 15:58:31 +1000 Subject: [PATCH 25/26] wip --- config/pulse.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/config/pulse.php b/config/pulse.php index e7cb4d49..adecde92 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -29,10 +29,12 @@ ], 'ingest' => [ - 'driver' => env('PULSE_INGEST_DRIVER', 'redis'), + 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + // TODO how does this play with "storage" and the conflicting key above. 'retain' => Interval::days(7), + // TODO this might conflict with sampling lottery / whatevers 'lottery' => [1, 100], 'redis' => [ From 7a11461d193c22274d74b1fe53f6346f6838f018 Mon Sep 17 00:00:00 2001 From: Tim MacDonald Date: Tue, 15 Aug 2023 16:28:28 +1000 Subject: [PATCH 26/26] wip --- src/Checks/QueueSize.php | 12 ++++++++---- src/Checks/SystemStats.php | 11 ++++++----- src/Commands/CheckCommand.php | 14 ++++---------- src/Pulse.php | 1 + 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/Checks/QueueSize.php b/src/Checks/QueueSize.php index fe82a6b4..cbd557fb 100644 --- a/src/Checks/QueueSize.php +++ b/src/Checks/QueueSize.php @@ -2,25 +2,29 @@ namespace Laravel\Pulse\Checks; +use Carbon\CarbonImmutable; use Illuminate\Support\Collection; use Illuminate\Support\Facades\Config; use Illuminate\Support\Facades\Queue; +use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; class QueueSize { /** * Resolve the queue size. * - * @return \Illuminate\Support\Collection> + * @return \Illuminate\Support\Collection */ - public function __invoke(): Collection + public function __invoke(CarbonImmutable $now): Collection { - return collect(Config::get('pulse.queues'))->map(fn ($queue) => [ + return collect(Config::get('pulse.queues'))->map(fn ($queue) => new Entry(Table::QueueSize, [ + 'date' => $now->toDateTimeString(), 'queue' => $queue, 'size' => Queue::size($queue), 'failed' => collect(app('queue.failer')->all()) ->filter(fn ($job) => $job->queue === $queue) ->count(), - ]); + ])); } } diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php index 3ca8fa91..f8984395 100644 --- a/src/Checks/SystemStats.php +++ b/src/Checks/SystemStats.php @@ -2,20 +2,21 @@ namespace Laravel\Pulse\Checks; -use Illuminate\Support\Collection; +use Carbon\CarbonImmutable; use Illuminate\Support\Facades\Config; +use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use RuntimeException; class SystemStats { /** * Resolve the systems stats. - * - * @return \Illuminate\Support\Collection */ - public function __invoke(): Collection + public function __invoke(CarbonImmutable $now): Entry { - return collect([ + return new Entry(Table::Server, [ + 'date' => $now->toDateTimeString(), 'server' => Config::get('pulse.server_name'), ...match (PHP_OS_FAMILY) { 'Darwin' => [ diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 4204c9f0..4c17a4e4 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -66,24 +66,18 @@ public function handle( * Collect server stats. */ - Pulse::record(new Entry(Table::Server, [ - 'date' => $lastSnapshotAt->toDateTimeString(), - ...$stats = $systemStats(), - ])); + Pulse::record($entry = $systemStats($lastSnapshotAt)); - $this->line('[system stats] '.$stats->toJson()); + $this->line('[system stats] '.json_encode($entry->attributes)); /* * Collect queue sizes. */ if (Cache::lock("illuminate:pulse:check-queue-sizes:{$lastSnapshotAt->timestamp}", $this->interval)->get()) { - $sizes = $queueSize()->each(fn ($queue) => Pulse::record(new Entry(Table::QueueSize, [ - 'date' => $lastSnapshotAt->toDateTimeString(), - ...$queue, - ]))); + $entries = $queueSize($lastSnapshotAt)->each(fn ($entry) => Pulse::record($entry)); - $this->line('[queue sizes] '.$sizes->toJson()); + $this->line('[queue sizes] '.$entries->pluck('attributes')->toJson()); } Pulse::store(); diff --git a/src/Pulse.php b/src/Pulse.php index cc9b0c5d..5cd9075a 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -264,6 +264,7 @@ public function rescue(callable $callback): void try { $callback(); } catch (Throwable $e) { + // TODO is this a good default? ($this->handleExceptionsUsing ?? fn () => null)($e); } }