diff --git a/phpstan.neon.dist b/_phpstan.neon.dist similarity index 57% rename from phpstan.neon.dist rename to _phpstan.neon.dist index 70062030..099b4941 100644 --- a/phpstan.neon.dist +++ b/_phpstan.neon.dist @@ -1,6 +1,5 @@ parameters: paths: - - config - src - level: 0 + level: 8 diff --git a/config/pulse.php b/config/pulse.php index 7c760110..adecde92 100644 --- a/config/pulse.php +++ b/config/pulse.php @@ -1,5 +1,6 @@ env('PULSE_SERVER_NAME', gethostname()), - // 'ingest' => Laravel\Pulse\Ingests\Database::class, - 'ingest' => Laravel\Pulse\Ingests\Redis::class, + 'storage' => [ + 'driver' => env('PULSE_STORAGE_DRIVER', 'database'), + + 'retain' => Interval::days(7), + + 'database' => [ + 'connection' => env('PULSE_DB_CONNECTION') ?? env('DB_CONNECTION') ?? 'mysql', + ], + ], + + 'ingest' => [ + 'driver' => env('PULSE_INGEST_DRIVER', 'storage'), + + // TODO how does this play with "storage" and the conflicting key above. + 'retain' => Interval::days(7), + + // TODO this might conflict with sampling lottery / whatevers + 'lottery' => [1, 100], + + 'redis' => [ + 'connection' => env('PULSE_REDIS_CONNECTION') ?? 'default', + ], + ], // TODO: filter configuration? - // TODO: trim lottery configuration - // TODO: configure days of data to store? default: 7 // in milliseconds 'slow_endpoint_threshold' => 1000, diff --git a/src/Checks/QueueSize.php b/src/Checks/QueueSize.php new file mode 100644 index 00000000..cbd557fb --- /dev/null +++ b/src/Checks/QueueSize.php @@ -0,0 +1,30 @@ + + */ + public function __invoke(CarbonImmutable $now): Collection + { + return collect(Config::get('pulse.queues'))->map(fn ($queue) => new Entry(Table::QueueSize, [ + 'date' => $now->toDateTimeString(), + 'queue' => $queue, + 'size' => Queue::size($queue), + 'failed' => collect(app('queue.failer')->all()) + ->filter(fn ($job) => $job->queue === $queue) + ->count(), + ])); + } +} diff --git a/src/Checks/SystemStats.php b/src/Checks/SystemStats.php new file mode 100644 index 00000000..f8984395 --- /dev/null +++ b/src/Checks/SystemStats.php @@ -0,0 +1,41 @@ + $now->toDateTimeString(), + 'server' => Config::get('pulse.server_name'), + ...match (PHP_OS_FAMILY) { + 'Darwin' => [ + 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, + 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB + 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB + ], + 'Linux' => [ + 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, + 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB + 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB + ], + default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), + }, + 'storage' => collect(Config::get('pulse.directories'))->map(fn ($directory) => [ + 'directory' => $directory, + 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB + 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB + ])->toJson(), + ]); + } +} diff --git a/src/Commands/CheckCommand.php b/src/Commands/CheckCommand.php index 786aaa98..4c17a4e4 100644 --- a/src/Commands/CheckCommand.php +++ b/src/Commands/CheckCommand.php @@ -5,10 +5,13 @@ use Carbon\CarbonImmutable; use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; -use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Queue; use Illuminate\Support\Sleep; -use RuntimeException; +use Laravel\Pulse\Checks\QueueSize; +use Laravel\Pulse\Checks\SystemStats; +use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; +use Laravel\Pulse\Facades\Pulse; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:check')] @@ -36,24 +39,22 @@ class CheckCommand extends Command /** * Handle the command. */ - public function handle(): int - { + public function handle( + SystemStats $systemStats, + QueueSize $queueSize, + ): int { $lastRestart = Cache::get('illuminate:pulse:restart'); $lastSnapshotAt = (new CarbonImmutable)->floorSeconds($this->interval); while (true) { - if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { - $this->comment('Pulse restart requested. Exiting at '.$now->toDateTimeString()); + $now = new CarbonImmutable(); + if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { return self::SUCCESS; } - $now = new CarbonImmutable(); - if ($now->subSeconds($this->interval)->lessThan($lastSnapshotAt)) { - $this->comment('Sleeping for a second at '.$now->toDateTimeString()); - Sleep::for(1)->second(); continue; @@ -62,82 +63,24 @@ public function handle(): int $lastSnapshotAt = $now->floorSeconds($this->interval); /* - * Check system stats. + * Collect server stats. */ - $stats = [ - 'date' => $lastSnapshotAt->toDateTimeString(), - 'server' => config('pulse.server_name'), - ...$this->getStats(), - 'storage' => collect(config('pulse.directories'))->map(fn ($directory) => [ - 'directory' => $directory, - 'total' => $total = intval(round(disk_total_space($directory) / 1024 / 1024)), // MB - 'used' => intval(round($total - (disk_free_space($directory) / 1024 / 1024))), // MB - ])->toJson(), - ]; - - // TODO: do this via the injest - DB::table('pulse_servers')->insert($stats); + Pulse::record($entry = $systemStats($lastSnapshotAt)); - $this->line('[system stats] '.json_encode($stats)); + $this->line('[system stats] '.json_encode($entry->attributes)); /* - * Check queue sizes. + * Collect queue sizes. */ if (Cache::lock("illuminate:pulse:check-queue-sizes:{$lastSnapshotAt->timestamp}", $this->interval)->get()) { - $sizes = collect(config('pulse.queues')) - ->map(fn ($queue) => [ - 'date' => $lastSnapshotAt->toDateTimeString(), - 'queue' => $queue, - 'size' => Queue::size($queue), - 'failed' => collect(app('queue.failer')->all()) - ->filter(fn ($job) => $job->queue === $queue) - ->count(), - ]); - - DB::table('pulse_queue_sizes')->insert($sizes->all()); - - $this->line('[queue sizes] '.$sizes->toJson()); + $entries = $queueSize($lastSnapshotAt)->each(fn ($entry) => Pulse::record($entry)); + + $this->line('[queue sizes] '.$entries->pluck('attributes')->toJson()); } - $this->comment('Stats and queue sizes checked at: '.$now->toDateTimeString()); + Pulse::store(); } } - - /** - * Collect stats. - */ - protected function getStats(): array - { - return match (PHP_OS_FAMILY) { - 'Darwin' => $this->getDarwinStats(), - 'Linux' => $this->getLinuxStats(), - default => throw new RuntimeException('The pulse:check command does not currently support '.PHP_OS_FAMILY), - }; - } - - /** - * Collect stats for "Darwin" based systems. - */ - protected function getDarwinStats(): array - { - return [ - 'cpu_percent' => (int) `top -l 1 | grep -E "^CPU" | tail -1 | awk '{ print $3 + $5 }'`, - 'memory_total' => $memoryTotal = intval(`sysctl hw.memsize | grep -Eo '[0-9]+'` / 1024 / 1024), // MB - 'memory_used' => $memoryTotal - intval(intval(`vm_stat | grep 'Pages free' | grep -Eo '[0-9]+'`) * intval(`pagesize`) / 1024 / 1024), // MB - ]; - } - - /** - * Collect stats for "Linux" based systems. - */ - protected function getLinuxStats(): array - { - return [ - 'cpu_percent' => (int) `top -bn1 | grep '%Cpu(s)' | tail -1 | grep -Eo '[0-9]+\.[0-9]+' | head -n 4 | tail -1 | awk '{ print 100 - $1 }'`, - 'memory_total' => $memoryTotal = intval(`cat /proc/meminfo | grep MemTotal | grep -E -o '[0-9]+'` / 1024), // MB - 'memory_used' => $memoryTotal - intval(`cat /proc/meminfo | grep MemAvailable | grep -E -o '[0-9]+'` / 1024), // MB - ]; - } } diff --git a/src/Commands/RestartCommand.php b/src/Commands/RestartCommand.php index 19d2e953..23120412 100644 --- a/src/Commands/RestartCommand.php +++ b/src/Commands/RestartCommand.php @@ -5,8 +5,6 @@ use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\InteractsWithTime; -use Laravel\Pulse\Ingests\Database; -use Laravel\Pulse\Ingests\Redis; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:restart')] diff --git a/src/Commands/WorkCommand.php b/src/Commands/WorkCommand.php index c035ae49..ca5dda7b 100644 --- a/src/Commands/WorkCommand.php +++ b/src/Commands/WorkCommand.php @@ -6,8 +6,8 @@ use Illuminate\Console\Command; use Illuminate\Support\Facades\Cache; use Illuminate\Support\Sleep; -use Laravel\Pulse\Ingests\Database; -use Laravel\Pulse\Ingests\Redis; +use Laravel\Pulse\Contracts\Ingest; +use Laravel\Pulse\Contracts\Storage; use Symfony\Component\Console\Attribute\AsCommand; #[AsCommand(name: 'pulse:work')] @@ -30,34 +30,30 @@ class WorkCommand extends Command /** * Handle the command. */ - public function handle(Redis $redisIngest, Database $databaseIngest): int + public function handle(Ingest $ingest, Storage $storage): int { $lastRestart = Cache::get('illuminate:pulse:restart'); - $lastTrimmedDatabaseAt = (new CarbonImmutable)->startOfMinute(); + $lastTrimmedStorageAt = (new CarbonImmutable)->startOfMinute(); while (true) { $now = new CarbonImmutable; if (Cache::get('illuminate:pulse:restart') !== $lastRestart) { - $this->comment('Pulse restart requested. Exiting at '.$now->toDateTimeString()); - return self::SUCCESS; } - if ($now->subMinute()->greaterThan($lastTrimmedDatabaseAt)) { - $this->comment('Trimming the database at '.$now->toDateTimeString()); + if ($now->subMinute()->greaterThan($lastTrimmedStorageAt)) { + $storage->trim(); - $databaseIngest->trim($now->subWeek()); + $lastTrimmedStorageAt = $now; - $lastTrimmedDatabaseAt = $now; + $this->comment('Storage trimmed'); } - $processed = $redisIngest->processEntries(1000); + $processed = $ingest->store($storage, 1000); if ($processed === 0) { - $this->comment('Queue finished processing. Sleeping at '.$now->toDateTimeString()); - Sleep::for(1)->second(); } else { $this->comment('Processed ['.$processed.'] entries at '.$now->toDateTimeString()); diff --git a/src/Contracts/Ingest.php b/src/Contracts/Ingest.php index c14750fa..779a933d 100644 --- a/src/Contracts/Ingest.php +++ b/src/Contracts/Ingest.php @@ -2,17 +2,25 @@ namespace Laravel\Pulse\Contracts; -use Carbon\CarbonImmutable; +use Illuminate\Support\Collection; interface Ingest { /** - * Ingest the entries and updates without throwing exceptions. + * Ingest the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ - public function ingestSilently(array $entries, array $updates): void; + public function ingest(Collection $entries, Collection $updates): void; /** - * Trim the ingest without throwing exceptions. + * Trim the ingested entries. */ - public function trimSilently(CarbonImmutable $oldest): void; + public function trim(): void; + + /** + * Store the ingested entries. + */ + public function store(Storage $storage, int $count): int; } diff --git a/src/Contracts/Storage.php b/src/Contracts/Storage.php new file mode 100644 index 00000000..02230318 --- /dev/null +++ b/src/Contracts/Storage.php @@ -0,0 +1,21 @@ + $entries + * @param \Illuminate\Support\Collection $updates + */ + public function store(Collection $entries, Collection $updates): void; + + /** + * Trim the stored entries. + */ + public function trim(): void; +} diff --git a/src/Entries/Entry.php b/src/Entries/Entry.php index 6d1d668d..94d78958 100644 --- a/src/Entries/Entry.php +++ b/src/Entries/Entry.php @@ -6,8 +6,10 @@ class Entry { /** * Create a new Entry instance. + * + * @param array $attributes */ - public function __construct(public string $table, public array $attributes) + public function __construct(public Table $table, public array $attributes) { // } @@ -15,24 +17,8 @@ public function __construct(public string $table, public array $attributes) /** * The entries table. */ - public function table(): string + public function table(): Table { return $this->table; } - - /** - * Determine if the update is the given type. - */ - public function is(Type $type): bool - { - return $this->type() === $type; - } - - /** - * The type of update. - */ - public function type(): Type - { - return Type::from($this->table()); - } } diff --git a/src/Entries/JobFinished.php b/src/Entries/JobFinished.php index 4f022954..434d1c20 100644 --- a/src/Entries/JobFinished.php +++ b/src/Entries/JobFinished.php @@ -2,7 +2,7 @@ namespace Laravel\Pulse\Entries; -use Illuminate\Support\Facades\DB; +use Illuminate\Database\Connection; class JobFinished extends Update { @@ -19,20 +19,20 @@ public function __construct( /** * Perform the update. */ - public function perform(): void + public function perform(Connection $db): void { - $this->query() + $db->table($this->table()->value) ->where('job_id', $this->jobId) ->update([ - 'duration' => DB::raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), + 'duration' => $db->raw('TIMESTAMPDIFF(MICROSECOND, `processing_started_at`, "'.$this->endedAt.'") / 1000'), ]); } /** * The update's table. */ - public function table(): string + public function table(): Table { - return 'pulse_jobs'; + return Table::Job; } } diff --git a/src/Entries/JobStarted.php b/src/Entries/JobStarted.php index 759c77f0..edadbfa0 100644 --- a/src/Entries/JobStarted.php +++ b/src/Entries/JobStarted.php @@ -2,6 +2,8 @@ namespace Laravel\Pulse\Entries; +use Illuminate\Database\Connection; + class JobStarted extends Update { /** @@ -17,9 +19,9 @@ public function __construct( /** * Perform the update. */ - public function perform(): void + public function perform(Connection $db): void { - $this->query() + $db->table($this->table()->value) ->where('job_id', $this->jobId) ->update([ 'processing_started_at' => $this->startedAt, @@ -29,8 +31,8 @@ public function perform(): void /** * The update's table. */ - public function table(): string + public function table(): Table { - return 'pulse_jobs'; + return Table::Job; } } diff --git a/src/Entries/Type.php b/src/Entries/Table.php similarity index 86% rename from src/Entries/Type.php rename to src/Entries/Table.php index 75bc5aa8..33784990 100644 --- a/src/Entries/Type.php +++ b/src/Entries/Table.php @@ -4,7 +4,7 @@ use Illuminate\Support\Collection; -enum Type: string +enum Table: string { case CacheHit = 'pulse_cache_hits'; case Exception = 'pulse_exceptions'; @@ -17,6 +17,8 @@ enum Type: string /** * Get all cases as a Collection. + * + * @return \Illuminate\Support\Collection */ public static function all(): Collection { diff --git a/src/Entries/Update.php b/src/Entries/Update.php index 304d146f..be67e28e 100644 --- a/src/Entries/Update.php +++ b/src/Entries/Update.php @@ -2,42 +2,17 @@ namespace Laravel\Pulse\Entries; -use Illuminate\Database\Query\Builder; -use Illuminate\Support\Facades\DB; +use Illuminate\Database\Connection; abstract class Update { /** * The update's table. */ - abstract public function table(): string; + abstract public function table(): Table; /** * Perform the update. */ - abstract public function perform(): void; - - /** - * Determine if the update is the given type. - */ - public function is(Type $type): bool - { - return $this->type() === $type; - } - - /** - * The type of update. - */ - public function type(): Type - { - return Type::from($this->table()); - } - - /** - * The table query. - */ - protected function query(): Builder - { - return DB::table($this->table()); - } + abstract public function perform(Connection $db): void; } diff --git a/src/Facades/Pulse.php b/src/Facades/Pulse.php index 7610e31c..f28f0a08 100644 --- a/src/Facades/Pulse.php +++ b/src/Facades/Pulse.php @@ -6,17 +6,20 @@ /** * @method static \Laravel\Pulse\Pulse shouldNotRecord() - * @method static void filter(callable $filter) - * @method static \Laravel\Pulse\Pulse resolveUsersUsing(void $callback) + * @method static \Laravel\Pulse\Pulse filter(callable $filter) + * @method static \Laravel\Pulse\Pulse record(\Laravel\Pulse\Entries\Entry $entry) + * @method static \Laravel\Pulse\Pulse recordUpdate(\Laravel\Pulse\Entries\Update $update) + * @method static \Laravel\Pulse\Pulse store() + * @method static \Laravel\Pulse\Pulse resolveUsersUsing(callable $callback) * @method static \Illuminate\Support\Collection resolveUsers(\Illuminate\Support\Collection $ids) - * @method static void record(\Laravel\Pulse\Entries\Entry $entry) - * @method static void recordUpdate(\Laravel\Pulse\Entries\Update $update) - * @method static void store() * @method static string css() * @method static string js() * @method static bool check(\Illuminate\Http\Request $request) - * @method static \Laravel\Pulse\Pulse auth(\Closure $callback) + * @method static \Laravel\Pulse\Pulse auth(callable $callback) * @method static \Laravel\Pulse\Pulse ignoreMigrations() + * @method static bool runsMigrations() + * @method static \Laravel\Pulse\Pulse handleExceptionsUsing(callable $callback) + * @method static void rescue(callable $callback) * @method static void listenForStorageOpportunities(\Illuminate\Foundation\Application $app) * * @see \Laravel\Pulse\Pulse diff --git a/src/Handlers/HandleCacheInteraction.php b/src/Handlers/HandleCacheInteraction.php index 81ae85fa..f7d0159e 100644 --- a/src/Handlers/HandleCacheInteraction.php +++ b/src/Handlers/HandleCacheInteraction.php @@ -7,6 +7,7 @@ use Illuminate\Cache\Events\CacheMissed; use Illuminate\Support\Facades\Auth; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleCacheInteraction @@ -16,19 +17,19 @@ class HandleCacheInteraction */ public function __invoke(CacheHit|CacheMissed $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); if (str_starts_with($event->key, 'illuminate:')) { return; } - Pulse::record(new Entry('pulse_cache_hits', [ + Pulse::record(new Entry(Table::CacheHit, [ 'date' => $now->toDateTimeString(), 'hit' => $event instanceof CacheHit, 'key' => $event->key, 'user_id' => Auth::id(), ])); - }, report: false); + }); } } diff --git a/src/Handlers/HandleException.php b/src/Handlers/HandleException.php index f359873d..800c4620 100644 --- a/src/Handlers/HandleException.php +++ b/src/Handlers/HandleException.php @@ -6,6 +6,7 @@ use Illuminate\Support\Facades\Auth; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Throwable; @@ -16,16 +17,16 @@ class HandleException */ public function __invoke(Throwable $e): void { - rescue(function () use ($e) { + Pulse::rescue(function () use ($e) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_exceptions', [ + Pulse::record(new Entry(Table::Exception, [ 'date' => $now->toDateTimeString(), 'user_id' => Auth::id(), 'class' => $e::class, 'location' => $this->getLocation($e), ])); - }, report: false); + }); } /** diff --git a/src/Handlers/HandleHttpRequest.php b/src/Handlers/HandleHttpRequest.php index 393b8527..a88d372d 100644 --- a/src/Handlers/HandleHttpRequest.php +++ b/src/Handlers/HandleHttpRequest.php @@ -7,6 +7,7 @@ use Illuminate\Support\Carbon; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Symfony\Component\HttpFoundation\Response; @@ -17,15 +18,15 @@ class HandleHttpRequest */ public function __invoke(Carbon $startedAt, Request $request, Response $response): void { - rescue(function () use ($startedAt, $request) { + Pulse::rescue(function () use ($startedAt, $request) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_requests', [ + Pulse::record(new Entry(Table::Request, [ 'date' => $startedAt->toDateTimeString(), 'user_id' => $request->user()?->id, 'route' => $request->method().' '.Str::start(($request->route()?->uri() ?? $request->path()), '/'), 'duration' => $startedAt->diffInMilliseconds(), ])); - }, report: false); + }); } } diff --git a/src/Handlers/HttpRequestMiddleware.php b/src/Handlers/HandleOutgoingRequest.php similarity index 65% rename from src/Handlers/HttpRequestMiddleware.php rename to src/Handlers/HandleOutgoingRequest.php index b3c08faf..4cd62819 100644 --- a/src/Handlers/HttpRequestMiddleware.php +++ b/src/Handlers/HandleOutgoingRequest.php @@ -3,29 +3,34 @@ namespace Laravel\Pulse\Handlers; use Carbon\CarbonImmutable; +use GuzzleHttp\Promise\PromiseInterface; use GuzzleHttp\Promise\RejectedPromise; use Illuminate\Support\Facades\Auth; use Illuminate\Support\Str; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; use Psr\Http\Message\RequestInterface; -class HttpRequestMiddleware +class HandleOutgoingRequest { /** * Invoke the middleware. + * + * @param (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) $handler + * @return (callable(\Psr\Http\Message\RequestInterface, array): PromiseInterface) */ - public function __invoke(callable $handler) + public function __invoke(callable $handler): callable { return function ($request, $options) use ($handler) { $startedAt = new CarbonImmutable; return $handler($request, $options)->then(function ($response) use ($request, $startedAt) { - rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable), report: false); + Pulse::rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable)); return $response; }, function ($exception) use ($request, $startedAt) { - rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable), report: false); + Pulse::rescue(fn () => $this->record($request, $startedAt, new CarbonImmutable)); return new RejectedPromise($exception); }); @@ -37,7 +42,7 @@ public function __invoke(callable $handler) */ protected function record(RequestInterface $request, CarbonImmutable $startedAt, CarbonImmutable $endedAt): void { - Pulse::record(new Entry('pulse_outgoing_requests', [ + Pulse::record(new Entry(Table::OutgoingRequest, [ 'uri' => $request->getMethod().' '.Str::before($request->getUri(), '?'), 'date' => $startedAt->toDateTimeString(), 'duration' => $startedAt->diffInMilliseconds($endedAt), diff --git a/src/Handlers/HandleProcessedJob.php b/src/Handlers/HandleProcessedJob.php index fc976a03..9303d72b 100644 --- a/src/Handlers/HandleProcessedJob.php +++ b/src/Handlers/HandleProcessedJob.php @@ -15,7 +15,7 @@ class HandleProcessedJob */ public function __invoke(JobProcessed|JobFailed $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); // TODO respect slow limit configuration? I don't think we should @@ -26,6 +26,6 @@ public function __invoke(JobProcessed|JobFailed $event): void (string) $event->job->getJobId(), $now->toDateTimeString('millisecond') )); - }, report: false); + }); } } diff --git a/src/Handlers/HandleProcessingJob.php b/src/Handlers/HandleProcessingJob.php index 4febfa44..869455b2 100644 --- a/src/Handlers/HandleProcessingJob.php +++ b/src/Handlers/HandleProcessingJob.php @@ -14,13 +14,13 @@ class HandleProcessingJob */ public function __invoke(JobProcessing $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); Pulse::recordUpdate(new JobStarted( (string) $event->job->getJobId(), $now->toDateTimeString('millisecond') )); - }, report: false); + }); } } diff --git a/src/Handlers/HandleQuery.php b/src/Handlers/HandleQuery.php index 06421f75..2e864716 100644 --- a/src/Handlers/HandleQuery.php +++ b/src/Handlers/HandleQuery.php @@ -5,7 +5,9 @@ use Carbon\CarbonImmutable; use Illuminate\Database\Events\QueryExecuted; use Illuminate\Support\Facades\Auth; +use Illuminate\Support\Facades\Config; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleQuery @@ -15,19 +17,19 @@ class HandleQuery */ public function __invoke(QueryExecuted $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); - if ($event->time < config('pulse.slow_query_threshold')) { + if ($event->time < Config::get('pulse.slow_query_threshold')) { return; } - Pulse::record(new Entry('pulse_queries', [ - 'date' => $now->subMilliseconds(round($event->time))->toDateTimeString(), + Pulse::record(new Entry(Table::Query, [ + 'date' => $now->subMilliseconds((int) $event->time)->toDateTimeString(), 'user_id' => Auth::id(), 'sql' => $event->sql, - 'duration' => round($event->time), + 'duration' => (int) $event->time, ])); - }, report: false); + }); } } diff --git a/src/Handlers/HandleQueuedJob.php b/src/Handlers/HandleQueuedJob.php index 6b2b024f..f64763b2 100644 --- a/src/Handlers/HandleQueuedJob.php +++ b/src/Handlers/HandleQueuedJob.php @@ -6,6 +6,7 @@ use Illuminate\Queue\Events\JobQueued; use Illuminate\Support\Facades\Auth; use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Facades\Pulse; class HandleQueuedJob @@ -15,10 +16,10 @@ class HandleQueuedJob */ public function __invoke(JobQueued $event): void { - rescue(function () use ($event) { + Pulse::rescue(function () use ($event) { $now = new CarbonImmutable(); - Pulse::record(new Entry('pulse_jobs', [ + Pulse::record(new Entry(Table::Job, [ 'date' => $now->toDateTimeString(), 'user_id' => Auth::id(), 'job' => is_string($event->job) @@ -26,6 +27,6 @@ public function __invoke(JobQueued $event): void : $event->job::class, 'job_id' => $event->id, ])); - }, report: false); + }); } } diff --git a/src/Http/Livewire/Cache.php b/src/Http/Livewire/Cache.php index c6129a33..aadcf624 100644 --- a/src/Http/Livewire/Cache.php +++ b/src/Http/Livewire/Cache.php @@ -39,7 +39,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Concerns/HasPeriod.php b/src/Http/Livewire/Concerns/HasPeriod.php index d984b51c..c419e851 100644 --- a/src/Http/Livewire/Concerns/HasPeriod.php +++ b/src/Http/Livewire/Concerns/HasPeriod.php @@ -11,7 +11,7 @@ trait HasPeriod /** * The usage period. * - * @var '1_hour'|6_hours'|'24_hours'|'7_days'|null + * @var '1_hour'|'6_hours'|'24_hours'|'7_days'|null */ #[Url] public ?string $period = '1_hour'; @@ -19,7 +19,7 @@ trait HasPeriod /** * Handle the period-changed event. * - * @param '1_hour'|6_hours'|'24_hours'|'7_days' $period + * @param '1_hour'|'6_hours'|'24_hours'|'7_days' $period */ #[On('period-changed')] public function updatePeriod(string $period): void diff --git a/src/Http/Livewire/Exceptions.php b/src/Http/Livewire/Exceptions.php index dc4e3103..d522a7e2 100644 --- a/src/Http/Livewire/Exceptions.php +++ b/src/Http/Livewire/Exceptions.php @@ -42,7 +42,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/PeriodSelector.php b/src/Http/Livewire/PeriodSelector.php index 96502d6e..c3b6e4fc 100644 --- a/src/Http/Livewire/PeriodSelector.php +++ b/src/Http/Livewire/PeriodSelector.php @@ -27,6 +27,8 @@ public function render(): Renderable /** * Set the selected period. + * + * @param '1_hour'|'6_hours'|'24_hours'|'7_days' $period */ public function setPeriod(string $period): void { diff --git a/src/Http/Livewire/Queues.php b/src/Http/Livewire/Queues.php index b7cc350e..b7dca68b 100644 --- a/src/Http/Livewire/Queues.php +++ b/src/Http/Livewire/Queues.php @@ -26,7 +26,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Servers.php b/src/Http/Livewire/Servers.php index 062e1b93..83f9f9a2 100644 --- a/src/Http/Livewire/Servers.php +++ b/src/Http/Livewire/Servers.php @@ -39,7 +39,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-6']); } @@ -116,7 +116,7 @@ protected function servers(): Collection 'cpu_percent' => $server->cpu_percent, 'memory_used' => $server->memory_used, 'memory_total' => $server->memory_total, - 'storage' => json_decode($server->storage), + 'storage' => json_decode($server->storage, flags: JSON_THROW_ON_ERROR), 'readings' => $serverReadings->get($server->server)?->map(fn ($reading) => (object) [ 'cpu_percent' => $reading->cpu_percent, 'memory_used' => $reading->memory_used, diff --git a/src/Http/Livewire/SlowJobs.php b/src/Http/Livewire/SlowJobs.php index 9932c21c..c4d01949 100644 --- a/src/Http/Livewire/SlowJobs.php +++ b/src/Http/Livewire/SlowJobs.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowOutgoingRequests.php b/src/Http/Livewire/SlowOutgoingRequests.php index 0fb383d4..2ca9f853 100644 --- a/src/Http/Livewire/SlowOutgoingRequests.php +++ b/src/Http/Livewire/SlowOutgoingRequests.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowQueries.php b/src/Http/Livewire/SlowQueries.php index 6f2faef8..229e3de3 100644 --- a/src/Http/Livewire/SlowQueries.php +++ b/src/Http/Livewire/SlowQueries.php @@ -33,7 +33,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/SlowRoutes.php b/src/Http/Livewire/SlowRoutes.php index 7d8443fc..319b1278 100644 --- a/src/Http/Livewire/SlowRoutes.php +++ b/src/Http/Livewire/SlowRoutes.php @@ -34,7 +34,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } diff --git a/src/Http/Livewire/Usage.php b/src/Http/Livewire/Usage.php index 2cbf3691..40665672 100644 --- a/src/Http/Livewire/Usage.php +++ b/src/Http/Livewire/Usage.php @@ -60,7 +60,7 @@ public function render(): Renderable /** * Render the placeholder. */ - public function placeholder() + public function placeholder(): Renderable { return view('pulse::components.placeholder', ['class' => 'col-span-3']); } @@ -99,7 +99,7 @@ protected function userRequestCounts(): array return $user ? [ 'count' => $row->count, 'user' => [ - 'name' => $user['name'] ?? 'User: '.$user['id'], + 'name' => $user['name'], // "extra" rather than 'email' // avatar for pretty-ness? 'email' => $user['email'] ?? null, diff --git a/src/Ingests/Database.php b/src/Ingests/Database.php deleted file mode 100644 index 3346638e..00000000 --- a/src/Ingests/Database.php +++ /dev/null @@ -1,56 +0,0 @@ - $this->ingest($entries, $updates), report: false); - } - - /** - * Ingest the entries and updates. - */ - public function ingest(array $entries, array $updates): void - { - if ($entries === [] && $updates === []) { - return; - } - - DB::transaction(function () use ($entries, $updates) { - collect($entries)->each(fn ($rows, $table) => collect($rows) - ->chunk(1000) - ->map->all() - ->each(DB::table($table)->insert(...))); - - collect($updates)->each(fn ($update) => $update->perform()); - }); - } - - /** - * Trim the ingest without throwing exceptions. - */ - public function trimSilently(CarbonImmutable $oldest): void - { - rescue(fn () => $this->trim($oldest), report: false); - } - - /** - * Trim the ingest. - */ - public function trim(CarbonImmutable $oldest): void - { - Type::all()->each(fn (Type $type) => DB::table($type->value) - ->where('date', '<', $oldest->toDateTimeString()) - ->delete()); - } -} diff --git a/src/Ingests/Redis.php b/src/Ingests/Redis.php index f2e9830c..a5e41810 100644 --- a/src/Ingests/Redis.php +++ b/src/Ingests/Redis.php @@ -3,7 +3,12 @@ namespace Laravel\Pulse\Ingests; use Carbon\CarbonImmutable; +use Carbon\CarbonInterval as Interval; +use Illuminate\Support\Collection; use Laravel\Pulse\Contracts\Ingest; +use Laravel\Pulse\Contracts\Storage; +use Laravel\Pulse\Entries\Entry; +use Laravel\Pulse\Entries\Table; use Laravel\Pulse\Entries\Update; use Laravel\Pulse\Redis as RedisConnection; @@ -15,38 +20,35 @@ class Redis implements Ingest protected string $stream = 'illuminate:pulse:entries'; /** - * Create a new Redis ingest instance. + * Create a new Redis Ingest instance. + * + * @param array{retain: string} $config */ - public function __construct(protected RedisConnection $connection, protected Database $db) + public function __construct(protected array $config, protected RedisConnection $connection) { // } - /** - * Ingest the entries and updates without throwing exceptions. - */ - public function ingestSilently(array $entries, array $updates): void - { - rescue(fn () => $this->ingest($entries, $updates), report: false); - } - /** * Ingest the entries and updates. + * + * @param \Illuminate\Support\Collection $entries + * @param \Illuminate\Support\Collection $updates */ - public function ingest(array $entries, array $updates): void + public function ingest(Collection $entries, Collection $updates): void { - if ($entries === [] && $updates === []) { + if ($entries->isEmpty() && $updates->isEmpty()) { return; } - $this->connection->pipeline(function (RedisConnection $pipeline) use ($entries, $updates) { - collect($entries)->each(fn ($rows, $table) => collect($rows) - ->each(fn ($data) => $pipeline->xadd($this->stream, [ + $this->connection->pipeline(function ($pipeline) use ($entries, $updates) { + $entries->groupBy('table.value') + ->each(fn ($rows, $table) => $rows->each(fn ($data) => $pipeline->xadd($this->stream, [ 'type' => $table, - 'data' => json_encode($data), + 'data' => json_encode($data, flags: JSON_THROW_ON_ERROR), ]))); - collect($updates)->each(fn ($update) => $pipeline->xadd($this->stream, [ + $updates->each(fn ($update) => $pipeline->xadd($this->stream, [ 'type' => 'pulse_update', 'data' => serialize($update), ])); @@ -54,25 +56,25 @@ public function ingest(array $entries, array $updates): void } /** - * Trim the ingest without throwing exceptions. + * Trim the ingested entries. */ - public function trimSilently(CarbonImmutable $oldest): void + public function trim(): void { - rescue(fn () => $this->trim($oldest), report: false); + $this->connection->xtrim($this->stream, 'MINID', '~', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->getTimestampMs()); } /** - * Trim the ingest. + * The interval to trim the storage to. */ - public function trim(CarbonImmutable $oldest): void + protected function trimAfter(): Interval { - $this->connection->xtrim($this->stream, 'MINID', '~', $this->connection->streamIdAt($oldest)); + return new Interval($this->config['retain'] ?? 'P7D'); } /** - * Process the items on the Redis stream and persist in the database. + * Store the ingested entries. */ - public function processEntries(int $count): int + public function store(Storage $storage, int $count): int { $entries = collect($this->connection->xrange($this->stream, '-', '+', $count)); @@ -86,14 +88,13 @@ public function processEntries(int $count): int ->values() ->partition(fn ($entry) => $entry['type'] !== 'pulse_update'); - $inserts = $inserts - ->groupBy('type') - ->map->map(fn ($data): array => json_decode($data['data'], true)); + $inserts = $inserts->map(fn ($data) => with(json_decode($data['data'], true, flags: JSON_THROW_ON_ERROR), function ($data) { + return new Entry(Table::from($data['table']), $data['attributes']); + })); - $updates = $updates - ->map(fn ($data): Update => unserialize($data['data'])); + $updates = $updates->map(fn ($data): Update => unserialize($data['data'])); - $this->db->ingest($inserts->all(), $updates->all()); + $storage->store($inserts, $updates); $this->connection->xdel($this->stream, $keys->all()); diff --git a/src/Ingests/Storage.php b/src/Ingests/Storage.php new file mode 100644 index 00000000..31f507cd --- /dev/null +++ b/src/Ingests/Storage.php @@ -0,0 +1,46 @@ + $entries + * @param \Illuminate\Support\Collection $updates + */ + public function ingest(Collection $entries, Collection $updates): void + { + $this->storage->store($entries, $updates); + } + + /** + * Trim the ingested entries. + */ + public function trim(): void + { + $this->storage->trim(); + } + + /** + * Store the ingested entries. + */ + public function store(StorageContract $store, int $count): int + { + throw new RuntimeException('The storage ingest driver does not need to process entries.'); + } +} diff --git a/src/ListensForStorageOpportunities.php b/src/ListensForStorageOpportunities.php index 21f49080..f98b63e6 100644 --- a/src/ListensForStorageOpportunities.php +++ b/src/ListensForStorageOpportunities.php @@ -13,9 +13,9 @@ trait ListensForStorageOpportunities /** * An array indicating how many jobs are processing. * - * @var array + * @var array */ - protected static $processingJobs = []; + protected static array $processingJobs = []; /** * Register listeners that store the recorded Telescope entries. diff --git a/src/Pulse.php b/src/Pulse.php index fe4919d0..5cd9075a 100644 --- a/src/Pulse.php +++ b/src/Pulse.php @@ -2,7 +2,6 @@ namespace Laravel\Pulse; -use Carbon\CarbonImmutable; use Closure; use Illuminate\Http\Request; use Illuminate\Support\Collection; @@ -11,52 +10,72 @@ use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Entries\Entry; use Laravel\Pulse\Entries\Update; +use Throwable; class Pulse { use ListensForStorageOpportunities; /** - * The callback that should be used to authenticate Pulse users. + * The list of queued entries to be stored. + * + * @var \Illuminate\Support\Collection */ - public ?Closure $authUsing = null; + protected Collection $entriesQueue; /** - * Indicates if Pulse migrations will be run. + * The list of queued entry updates. + * + * @var \Illuminate\Support\Collection */ - public bool $runsMigrations = true; + protected Collection $updatesQueue; /** - * The list of queued entries to be stored. + * Indicates if Pulse should record entries. */ - public array $entriesQueue = []; + protected bool $shouldRecord = true; /** - * The list of queued entry updates. + * The entry filters. + * + * @var \Illuminate\Support\Collection */ - public array $updatesQueue = []; + protected Collection $filters; /** - * Indicates if Pulse should record entries. + * Users resolver. + * + * @var (callable(\Illuminate\Support\Collection): iterable)|null */ - public bool $shouldRecord = true; + protected $usersResolver; /** - * Users resolver. + * The callback that should be used to authenticate Pulse users. + * + * @var (callable(\Illuminate\Http\Request): bool)|null */ - public ?Closure $usersResolver = null; + protected $authUsing = null; /** - * The entry filters. + * Indicates if Pulse migrations will be run. + */ + protected bool $runsMigrations = true; + + /** + * Handle exceptions using the given callback. + * + * @var (callable(\Throwable): mixed)|null */ - public Collection $filters; + protected $handleExceptionsUsing = null; /** * Create a new Pulse instance. */ - public function __construct(protected Ingest $ingest) + public function __construct(protected array $config, protected Ingest $ingest) { $this->filters = collect([]); + + $this->clearQueue(); } /** @@ -71,8 +90,10 @@ public function shouldNotRecord(): self /** * Filter incoming entries using the provided filter. + * + * @param (callable(\Laravel\Pulse\Entries\Entry|\Laravel\Pulse\Entries\Update): bool) $filter */ - public function filter(callable $filter) + public function filter(callable $filter): self { $this->filters[] = $filter; @@ -80,72 +101,91 @@ public function filter(callable $filter) } /** - * Resolve the user's details using the given closure. + * Record the given entry. */ - public function resolveUsersUsing($callback): self + public function record(Entry $entry): self { - $this->usersResolver = $callback; + if ($this->shouldRecord) { + $this->entriesQueue[] = $entry; + } return $this; } /** - * Resolve the user's details using the given closure. + * Record the given entry update. */ - public function resolveUsers(Collection $ids): Collection + public function recordUpdate(Update $update): self { - if ($this->usersResolver) { - return collect(($this->usersResolver)($ids)); + if ($this->shouldRecord) { + $this->updatesQueue[] = $update; } - if (class_exists(\App\Models\User::class)) { - return \App\Models\User::findMany($ids); - } + return $this; + } - if (class_exists(\App\User::class)) { - return \App\User::findMany($ids); + /** + * Store the queued entries and flush the queue. + */ + public function store(): self + { + if (! $this->shouldRecord) { + return $this->clearQueue(); } - return $ids->map(fn ($id) => [ - 'id' => $id, - ]); + $this->rescue(fn () => $this->ingest->ingest( + $this->entriesQueue->filter($this->shouldRecord(...)), + $this->updatesQueue->filter($this->shouldRecord(...)), + )); + + $this->rescue(fn () => Lottery::odds(...$this->config['ingest']['lottery']) + ->winner(fn () => $this->ingest->trim()) + ->choose()); + + return $this->clearQueue(); } /** - * Record the given entry. + * Determine if the entry should be recorded. */ - public function record(Entry $entry): void + protected function shouldRecord(Entry|Update $entry): bool { - if ($this->shouldRecord($entry)) { - $this->entriesQueue[$entry->table][] = $entry->attributes; - } + return $this->filters->every(fn ($filter) => $filter($entry)); } /** - * Record the given entry update. + * Resolve the user's details using the given closure. + * + * @param (callable(\Illuminate\Support\Collection): iterable) $callback */ - public function recordUpdate(Update $update): void + public function resolveUsersUsing(callable $callback): self { - if ($this->shouldRecord($update)) { - $this->updatesQueue[] = $update; - } + $this->usersResolver = $callback; + + return $this; } /** - * Store the queued entries and flush the queue. + * Resolve the user's details using the given closure. + * + * @param \Illuminate\Support\Collection $ids + * @return \Illuminate\Support\Collection */ - public function store(): void + public function resolveUsers(Collection $ids): Collection { - $this->ingest->ingestSilently( - $this->entriesQueue, $this->updatesQueue, - ); + if ($this->usersResolver) { + return collect(($this->usersResolver)($ids)); + } - $this->entriesQueue = $this->updatesQueue = []; + if (class_exists(\App\Models\User::class)) { + return \App\Models\User::whereKey($ids)->get(['name', 'email']); + } - // TODO: lottery configuration? - Lottery::odds(1, 100) - ->winner(fn () => $this->ingest->trimSilently((new CarbonImmutable)->subWeek())) - ->choose(); + if (class_exists(\App\User::class)) { + return \App\User::whereKey($ids)->get(['name', 'email']); + } + + return $ids->map(fn ($id) => ['name' => "User ID: {$id}"]); } /** @@ -169,15 +209,15 @@ public function js(): string */ public function check(Request $request): bool { - return ($this->authUsing ?: function () { - return App::environment('local'); - })($request); + return ($this->authUsing ?: fn () => App::environment('local'))($request); } /** * Set the callback that should be used to authorize Pulse users. + * + * @param (callable(\Illuminate\Http\Request): bool) $callback */ - public function auth(Closure $callback): self + public function auth(callable $callback): self { $this->authUsing = $callback; @@ -203,10 +243,41 @@ public function runsMigrations(): bool } /** - * Determine if the entry should be recorded. + * Handle exceptions using the given callback. + * + * @param (callable(\Throwable): mixed) $callback */ - protected function shouldRecord(Entry|Update $entry): bool + public function handleExceptionsUsing(callable $callback): self + { + $this->handleExceptionsUsing = $callback; + + return $this; + } + + /** + * Execute the given callback handling any exceptions. + * + * @param (callable(): mixed) $callback + */ + public function rescue(callable $callback): void { - return $this->shouldRecord && $this->filters->every(fn (callable $filter) => $filter($entry)); + try { + $callback(); + } catch (Throwable $e) { + // TODO is this a good default? + ($this->handleExceptionsUsing ?? fn () => null)($e); + } + } + + /** + * Clear any pending entries on the queue. + */ + protected function clearQueue(): self + { + $this->entriesQueue = collect([]); + + $this->updatesQueue = collect([]); + + return $this; } } diff --git a/src/PulseServiceProvider.php b/src/PulseServiceProvider.php index eec480eb..39cdb402 100644 --- a/src/PulseServiceProvider.php +++ b/src/PulseServiceProvider.php @@ -12,7 +12,9 @@ use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Queue\Events\JobQueued; +use Illuminate\Support\Arr; use Illuminate\Support\Facades\Blade; +use Illuminate\Support\Facades\Config; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Http; use Illuminate\Support\Facades\Route; @@ -20,15 +22,17 @@ use Laravel\Pulse\Commands\CheckCommand; use Laravel\Pulse\Commands\RestartCommand; use Laravel\Pulse\Commands\WorkCommand; +use Laravel\Pulse\Contracts\Ingest; use Laravel\Pulse\Contracts\ShouldNotReportUsage; +use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Handlers\HandleCacheInteraction; use Laravel\Pulse\Handlers\HandleException; use Laravel\Pulse\Handlers\HandleHttpRequest; +use Laravel\Pulse\Handlers\HandleOutgoingRequest; use Laravel\Pulse\Handlers\HandleProcessedJob; use Laravel\Pulse\Handlers\HandleProcessingJob; use Laravel\Pulse\Handlers\HandleQuery; use Laravel\Pulse\Handlers\HandleQueuedJob; -use Laravel\Pulse\Handlers\HttpRequestMiddleware; use Laravel\Pulse\Http\Livewire\Cache; use Laravel\Pulse\Http\Livewire\Exceptions; use Laravel\Pulse\Http\Livewire\PeriodSelector; @@ -40,6 +44,9 @@ use Laravel\Pulse\Http\Livewire\SlowRoutes; use Laravel\Pulse\Http\Livewire\Usage; use Laravel\Pulse\Http\Middleware\Authorize; +use Laravel\Pulse\Ingests\Redis as RedisIngest; +use Laravel\Pulse\Ingests\Storage as StorageIngest; +use Laravel\Pulse\Storage\Database; use Laravel\Pulse\View\Components\Pulse as PulseComponent; use Livewire\Livewire; @@ -54,9 +61,39 @@ public function register(): void return; } - $this->app->singleton(Pulse::class, fn ($app) => new Pulse($app[config('pulse.ingest')])); + $this->app->singleton(Pulse::class, fn ($app) => new Pulse(Config::get('pulse'), $app[Ingest::class])); - $this->app->scoped(Redis::class, fn () => new Redis(app('redis')->connection())); + $this->app->singleton(Storage::class, function ($app) { + $driver = Config::get('pulse.storage.driver'); + + $config = [ + ...Arr::only(Config::get('pulse.storage'), ['retain']), + ...Config::get("pulse.storage.{$driver}"), + ]; + + return new Database($config, $app['db']); + }); + + $this->app->singleton(Ingest::class, function ($app) { + $driver = Config::get('pulse.ingest.driver'); + + if ($driver === 'storage') { + return $app[StorageIngest::class]; + } + + $ingestConfig = [ + ...Arr::only(Config::get('pulse.ingest'), ['retain', 'lottery']), + ...Config::get("pulse.ingest.{$driver}"), + ]; + + $redisConfig = [ + ...Config::get('database.redis.options'), + ...Config::get("database.redis.{$ingestConfig['connection']}"), + ...$ingestConfig, + ]; + + return new RedisIngest($ingestConfig, new Redis($redisConfig, $app['redis'])); + }); $this->mergeConfigFrom( __DIR__.'/../config/pulse.php', 'pulse' @@ -115,7 +152,7 @@ protected function listenForEvents(): void ], HandleProcessedJob::class); if (method_exists(Factory::class, 'globalMiddleware')) { - Http::globalMiddleware(new HttpRequestMiddleware); + Http::globalMiddleware(new HandleOutgoingRequest); } // TODO: Telescope passes the container like this, but I'm unsure how it works with Octane. @@ -129,17 +166,14 @@ protected function listenForEvents(): void protected function registerRoutes(): void { Route::group([ - 'domain' => config('pulse.domain', null), - 'middleware' => config('pulse.middleware', 'web'), - 'namespace' => 'Laravel\Pulse\Http\Controllers', - 'prefix' => config('pulse.path'), - ], function () { - Route::get('/', function (Pulse $pulse) { - $pulse->shouldRecord = false; - - return view('pulse::dashboard'); - }); - }); + 'domain' => Config::get('pulse.domain', null), + 'middleware' => Config::get('pulse.middleware', 'web'), + 'prefix' => Config::get('pulse.path'), + ], fn () => Route::get('/', function (Pulse $pulse) { + $pulse->shouldNotRecord(); + + return view('pulse::dashboard'); + })); } /** diff --git a/src/Redis.php b/src/Redis.php index 122ca068..84daa607 100644 --- a/src/Redis.php +++ b/src/Redis.php @@ -2,8 +2,8 @@ namespace Laravel\Pulse; -use Carbon\CarbonImmutable; use Illuminate\Redis\Connections\Connection; +use Illuminate\Redis\RedisManager; use Predis\Client as Predis; use Predis\Pipeline\Pipeline; use Redis as PhpRedis; @@ -12,18 +12,21 @@ /** * @mixin \Redis * @mixin \Predis\Client + * + * @internal */ class Redis { /** * Create a new Redis instance. * + * @param array{connection: string, prefix: string} $config * @param \Redis|\Predis\Client|\Predis\Pipeline\Pipeline|null $client */ - public function __construct(protected ?Connection $connection = null, protected $client = null) + public function __construct(protected array $config, protected ?RedisManager $manager = null, protected $client = null) { - if (array_filter(func_get_args()) === []) { - throw new RuntimeException('Must provider a connection or client.'); + if ($manager === null && $client === null) { + throw new RuntimeException('Must provider a manager or client.'); } } @@ -32,7 +35,7 @@ public function __construct(protected ?Connection $connection = null, protected */ public function xadd($key, $dictionary) { - if ($this->isPhpRedis()) { + if ($this->client() instanceof PhpRedis) { return $this->client()->xAdd($key, '*', $dictionary); } @@ -52,11 +55,9 @@ public function xrange($key, $start, $end, $count = null) */ public function xtrim($key, $strategy, $strategyModifier, $threshold) { - $prefix = config('database.redis.options.prefix'); - - if ($this->isPhpRedis()) { + if ($this->client() instanceof PhpRedis) { // PHP Redis does not support the minid strategy. - return $this->client()->rawCommand('XTRIM', $prefix.$key, $strategy, $strategyModifier, $threshold); + return $this->client()->rawCommand('XTRIM', $this->config['prefix'].$key, $strategy, $strategyModifier, $threshold); } return $this->client()->xtrim($key, [$strategy, $strategyModifier], $threshold); @@ -69,48 +70,22 @@ public function pipeline(callable $closure): array { // TODO explain this code - lol // ensure we run against a connection... - return $this->connection->pipeline(function ($redis) use ($closure) { - $closure(new self(client: $redis)); + return $this->connection()->pipeline(function ($redis) use ($closure) { + $closure(new self($this->config, client: $redis)); }); } /** - * Get the ID of the stream at a given time. - */ - public function streamIdAt(CarbonImmutable $timestamp): string - { - // TODO: pass through intervals rather than date instances everywhere - $diff = (new CarbonImmutable)->diffInMilliseconds($timestamp); - - $redisTime = $this->client()->time(); - - $redisTimestamp = $redisTime[0].substr($redisTime[1], 0, 3); - - return (string) ($redisTimestamp - $diff); - } - - /** - * Determine if the client is PhpRedis. - */ - protected function isPhpRedis(): bool - { - return $this->client() instanceof PhpRedis; - } - - /** - * Determine if the client is Predis. + * The connections client. */ - protected function isPredis(): bool + protected function client(): PhpRedis|Predis|Pipeline { - return $this->client() instanceof Predis; + return $this->connection()?->client() ?? $this->client; } - /** - * The connections client. - */ - protected function client(): PhpRedis|Predis|Pipeline + protected function connection(): ?Connection { - return $this->connection?->client() ?? $this->client; + return $this->manager?->connection($this->config['connection']); } /** @@ -118,6 +93,6 @@ protected function client(): PhpRedis|Predis|Pipeline */ public function __call(string $method, array $parameters): mixed { - return ($this->connection ?? $this->client)->{$method}(...$parameters); + return ($this->connection() ?? $this->client)->{$method}(...$parameters); } } diff --git a/src/Storage/Database.php b/src/Storage/Database.php new file mode 100644 index 00000000..02efe2e7 --- /dev/null +++ b/src/Storage/Database.php @@ -0,0 +1,74 @@ + $entries + * @param \Illuminate\Support\Collection $updates + */ + public function store(Collection $entries, Collection $updates): void + { + if ($entries->isEmpty() && $updates->isEmpty()) { + return; + } + + $this->connection()->transaction(function () use ($entries, $updates) { + $entries->groupBy('table.value') + ->each(fn ($rows, $table) => $rows->chunk(1000) + ->map(fn ($inserts) => $inserts->pluck('attributes')->all()) + ->each($this->connection()->table($table)->insert(...))); + + $updates->each(fn ($update) => $update->perform($this->connection())); + }); + } + + /** + * Trim the stored entries. + */ + public function trim(): void + { + Table::all() + ->each(fn ($table) => $this->connection() + ->table($table->value) + ->where('date', '<', (new CarbonImmutable)->subSeconds((int) $this->trimAfter()->totalSeconds)->toDateTimeString()) + ->delete()); + } + + /** + * The interval to trim the storage to. + */ + protected function trimAfter(): Interval + { + return new Interval($this->config['retain'] ?? 'P7D'); + } + + /** + * Get the database connection. + */ + protected function connection(): Connection + { + return $this->manager->connection($this->config['connection']); + } +}