diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 080571cd..14a758a7 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,7 @@ on: - cron: '0 0 * * *' jobs: - tests: + mysql: runs-on: ubuntu-22.04 services: @@ -18,7 +18,7 @@ jobs: image: mysql:5.7 env: MYSQL_ALLOW_EMPTY_PASSWORD: yes - MYSQL_DATABASE: testing + MYSQL_DATABASE: forge ports: - 3306:3306 options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 @@ -35,7 +35,7 @@ jobs: laravel: [10] stability: [prefer-lowest, prefer-stable] - name: PHP ${{ matrix.php }} - Laravel ${{ matrix.laravel }} - Stability ${{ matrix.stability }} + name: PHP ${{ matrix.php }} - Laravel ${{ matrix.laravel }} - Stability ${{ matrix.stability }} - MySQL 5.7 steps: - name: Checkout code @@ -62,3 +62,57 @@ jobs: env: DB_CONNECTION: mysql DB_USERNAME: root + + pgsql: + runs-on: ubuntu-22.04 + + services: + postgresql: + image: postgres:14 + env: + POSTGRES_DB: forge + POSTGRES_USER: forge + POSTGRES_PASSWORD: password + ports: + - 5432:5432 + options: --health-cmd=pg_isready --health-interval=10s --health-timeout=5s --health-retries=3 + redis: + image: redis + ports: + - 6379:6379 + options: --entrypoint redis-server + + strategy: + fail-fast: true + matrix: + php: [8.2, 8.3] + laravel: [10] + + name: PHP ${{ matrix.php }} - Laravel ${{ matrix.laravel }} - Stability ${{ matrix.stability }} - PostgreSQL 14 + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + extensions: dom, curl, libxml, mbstring, redis, pcntl, zip + ini-values: error_reporting=E_ALL + tools: composer:v2 + coverage: none + + - name: Install redis-cli + run: sudo apt-get install -qq redis-tools + + - name: Install dependencies + run: | + composer require "illuminate/contracts=^${{ matrix.laravel }}" --dev --no-update + composer update --prefer-dist --no-interaction --no-progress + + - name: Execute tests + run: vendor/bin/pest + env: + DB_CONNECTION: pgsql + DB_PASSWORD: password diff --git a/database/migrations/2023_06_07_000001_create_pulse_tables.php b/database/migrations/2023_06_07_000001_create_pulse_tables.php index 39141738..7efe3b1f 100644 --- a/database/migrations/2023_06_07_000001_create_pulse_tables.php +++ b/database/migrations/2023_06_07_000001_create_pulse_tables.php @@ -3,6 +3,7 @@ use Illuminate\Database\Migrations\Migration; use Illuminate\Database\Schema\Blueprint; use Illuminate\Support\Facades\Config; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Schema; return new class extends Migration @@ -20,25 +21,35 @@ public function getConnection(): ?string */ public function up(): void { - Schema::create('pulse_values', function (Blueprint $table) { + $connection = $this->getConnection() ?? DB::connection(); + + Schema::create('pulse_values', function (Blueprint $table) use ($connection) { $table->engine = 'InnoDB'; $table->unsignedInteger('timestamp'); $table->string('type'); $table->text('key'); - $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'); + match ($driver = $connection->getDriverName()) { + 'mysql' => $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'), + 'pgsql' => $table->uuid('key_hash')->storedAs('md5("key")::uuid'), + default => throw new RuntimeException("Unsupported database driver [{$driver}]."), + }; $table->text('value'); $table->index('timestamp'); // For trimming... $table->index('type'); // For fast lookups and purging... - $table->unique(['type', 'key_hash']); // For data integrity... + $table->unique(['type', 'key_hash']); // For data integrity and upserts... }); - Schema::create('pulse_entries', function (Blueprint $table) { + Schema::create('pulse_entries', function (Blueprint $table) use ($connection) { $table->engine = 'InnoDB'; $table->unsignedInteger('timestamp'); $table->string('type'); $table->text('key'); - $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'); + match ($driver = $connection->getDriverName()) { + 'mysql' => $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'), + 'pgsql' => $table->uuid('key_hash')->storedAs('md5("key")::uuid'), + default => throw new RuntimeException("Unsupported database driver [{$driver}]."), + }; $table->bigInteger('value')->nullable(); $table->index('timestamp'); // For trimming... @@ -47,13 +58,17 @@ public function up(): void $table->index(['timestamp', 'type', 'key_hash', 'value']); // For aggregate queries... }); - Schema::create('pulse_aggregates', function (Blueprint $table) { + Schema::create('pulse_aggregates', function (Blueprint $table) use ($connection) { $table->engine = 'InnoDB'; $table->unsignedInteger('bucket'); $table->unsignedMediumInteger('period'); $table->string('type'); $table->text('key'); - $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'); + match ($driver = $connection->getDriverName()) { + 'mysql' => $table->char('key_hash', 16)->charset('binary')->virtualAs('unhex(md5(`key`))'), + 'pgsql' => $table->uuid('key_hash')->storedAs('md5("key")::uuid'), + default => throw new RuntimeException("Unsupported database driver [{$driver}]."), + }; $table->string('aggregate'); $table->decimal('value', 20, 2); $table->unsignedInteger('count')->nullable(); diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 8cc657da..0749980f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -10,8 +10,5 @@ - - - diff --git a/src/Storage/DatabaseStorage.php b/src/Storage/DatabaseStorage.php index 771fd977..66c12fcc 100644 --- a/src/Storage/DatabaseStorage.php +++ b/src/Storage/DatabaseStorage.php @@ -8,16 +8,16 @@ use Illuminate\Database\Connection; use Illuminate\Database\DatabaseManager; use Illuminate\Database\Query\Builder; -use Illuminate\Support\Arr; +use Illuminate\Database\Query\Expression; use Illuminate\Support\Collection; -use Illuminate\Support\LazyCollection; use InvalidArgumentException; use Laravel\Pulse\Contracts\Storage; use Laravel\Pulse\Entry; use Laravel\Pulse\Value; +use RuntimeException; /** - * @phpstan-type AggregateRow array{bucket: int, period: int, type: string, aggregate: string, key: string, value: int, count: int} + * @phpstan-type AggregateRow array{bucket: int, period: int, type: string, aggregate: string, key: string, value: int|float, count?: int} * * @internal */ @@ -55,25 +55,18 @@ public function store(Collection $items): void ->insert($chunk->map->attributes()->all()) ); - $periods = [ - (int) (CarbonInterval::hour()->totalSeconds / 60), - (int) (CarbonInterval::hours(6)->totalSeconds / 60), - (int) (CarbonInterval::hours(24)->totalSeconds / 60), - (int) (CarbonInterval::days(7)->totalSeconds / 60), - ]; - $this - ->aggregateAttributes($entries->filter->isCount(), $periods, 'count') + ->aggregateCounts($entries->filter->isCount()) ->chunk($this->config->get('pulse.storage.database.chunk')) ->each(fn ($chunk) => $this->upsertCount($chunk->all())); $this - ->aggregateAttributes($entries->filter->isMax(), $periods, 'max') + ->aggregateMaximums($entries->filter->isMax()) ->chunk($this->config->get('pulse.storage.database.chunk')) ->each(fn ($chunk) => $this->upsertMax($chunk->all())); $this - ->aggregateAttributes($entries->filter->isAvg(), $periods, 'avg') + ->aggregateAverages($entries->filter->isAvg()) ->chunk($this->config->get('pulse.storage.database.chunk')) ->each(fn ($chunk) => $this->upsertAvg($chunk->all())); @@ -147,11 +140,18 @@ public function purge(array $types = null): void * * @param list $values */ - protected function upsertCount(array $values): bool + protected function upsertCount(array $values): int { - return $this->upsert( + return $this->connection()->table('pulse_aggregates')->upsert( $values, - 'on duplicate key update `value` = `value` + 1' + ['bucket', 'period', 'type', 'aggregate', 'key_hash'], + [ + 'value' => match ($driver = $this->connection()->getDriverName()) { + 'mysql' => new Expression('`value` + values(`value`)'), + 'pgsql' => new Expression('"pulse_aggregates"."value" + "excluded"."value"'), + default => throw new RuntimeException("Unsupported database driver [{$driver}]"), + }, + ] ); } @@ -160,11 +160,18 @@ protected function upsertCount(array $values): bool * * @param list $values */ - protected function upsertMax(array $values): bool + protected function upsertMax(array $values): int { - return $this->upsert( + return $this->connection()->table('pulse_aggregates')->upsert( $values, - 'on duplicate key update `value` = greatest(`value`, values(`value`))' + ['bucket', 'period', 'type', 'aggregate', 'key_hash'], + [ + 'value' => match ($driver = $this->connection()->getDriverName()) { + 'mysql' => new Expression('greatest(`value`, values(`value`))'), + 'pgsql' => new Expression('greatest("pulse_aggregates"."value", "excluded"."value")'), + default => throw new RuntimeException("Unsupported database driver [{$driver}]"), + }, + ] ); } @@ -173,66 +180,157 @@ protected function upsertMax(array $values): bool * * @param list $values */ - protected function upsertAvg(array $values): bool + protected function upsertAvg(array $values): int { - return $this->upsert( + return $this->connection()->table('pulse_aggregates')->upsert( $values, - ' on duplicate key update `value` = (`value` * `count` + values(`value`)) / (`count` + 1), `count` = `count` + 1', + ['bucket', 'period', 'type', 'aggregate', 'key_hash'], + match ($driver = $this->connection()->getDriverName()) { + 'mysql' => [ + 'value' => new Expression('(`value` * `count` + (values(`value`) * values(`count`))) / (`count` + values(`count`))'), + 'count' => new Expression('`count` + values(`count`)'), + ], + 'pgsql' => [ + 'value' => new Expression('("pulse_aggregates"."value" * "pulse_aggregates"."count" + ("excluded"."value" * "excluded"."count")) / ("pulse_aggregates"."count" + "excluded"."count")'), + 'count' => new Expression('"pulse_aggregates"."count" + "excluded"."count"'), + ], + default => throw new RuntimeException("Unsupported database driver [{$driver}]"), + } ); } /** - * Perform an "upsert" query with an "on duplicate key" clause. + * Get the count aggregates * - * @param list $values + * @param \Illuminate\Support\Collection $entries + * @return \Illuminate\Support\Collection */ - protected function upsert(array $values, string $onDuplicateKeyClause): bool + protected function aggregateCounts(Collection $entries): Collection { - $grammar = $this->connection()->getQueryGrammar(); + $aggregates = []; - $sql = $grammar->compileInsert( - $this->connection()->table('pulse_aggregates'), - $values - ); + foreach ($entries as $entry) { + foreach ($this->periods() as $period) { + // Exclude entries that would be trimmed. + if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { + continue; + } - $sql .= ' '.$onDuplicateKeyClause; + $bucket = (int) (floor($entry->timestamp / $period) * $period); + + $key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; + + if (! isset($aggregates[$key])) { + $aggregates[$key] = [ + 'bucket' => $bucket, + 'period' => $period, + 'type' => $entry->type, + 'aggregate' => 'count', + 'key' => $entry->key, + 'value' => 1, + ]; + } else { + $aggregates[$key]['value']++; + } + } + } - return $this->connection()->statement($sql, Arr::flatten($values, 1)); + return collect(array_values($aggregates)); } /** - * Get the aggregate attributes for the collection. + * Get the maximum aggregates * * @param \Illuminate\Support\Collection $entries - * @param list $periods - * @return \Illuminate\Support\LazyCollection + * @return \Illuminate\Support\Collection */ - protected function aggregateAttributes(Collection $entries, array $periods, string $aggregateSuffix): LazyCollection + protected function aggregateMaximums(Collection $entries): Collection { - return LazyCollection::make(function () use ($entries, $periods, $aggregateSuffix) { - foreach ($entries as $entry) { - foreach ($periods as $period) { - // Exclude entries that would be trimmed. - if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { - continue; - } + $aggregates = []; + + foreach ($entries as $entry) { + foreach ($this->periods() as $period) { + // Exclude entries that would be trimmed. + if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { + continue; + } + + $bucket = (int) (floor($entry->timestamp / $period) * $period); + + $key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; - yield [ - 'bucket' => (int) (floor($entry->timestamp / $period) * $period), + if (! isset($aggregates[$key])) { + $aggregates[$key] = [ + 'bucket' => $bucket, 'period' => $period, 'type' => $entry->type, - 'aggregate' => $aggregateSuffix, + 'aggregate' => 'max', 'key' => $entry->key, - 'value' => $aggregateSuffix === 'count' - ? 1 - : $entry->value, - ...($aggregateSuffix === 'avg') - ? ['count' => 1] - : [], + 'value' => (int) $entry->value, ]; + } else { + $aggregates[$key]['value'] = (int) max($aggregates[$key]['value'], $entry->value); } } - }); + } + + return collect(array_values($aggregates)); + } + + /** + * Get the average aggregates + * + * @param \Illuminate\Support\Collection $entries + * @return \Illuminate\Support\Collection + */ + protected function aggregateAverages(Collection $entries): Collection + { + $aggregates = []; + + foreach ($entries as $entry) { + foreach ($this->periods() as $period) { + // Exclude entries that would be trimmed. + if ($entry->timestamp < CarbonImmutable::now()->subMinutes($period)->getTimestamp()) { + continue; + } + + $bucket = (int) (floor($entry->timestamp / $period) * $period); + + $key = $entry->type.':'.$period.':'.$bucket.':'.$entry->key; + + if (! isset($aggregates[$key])) { + $aggregates[$key] = [ + 'bucket' => $bucket, + 'period' => $period, + 'type' => $entry->type, + 'aggregate' => 'avg', + 'key' => $entry->key, + 'value' => (int) $entry->value, + 'count' => 1, + ]; + } else { + $aggregates[$key]['value'] = ($aggregates[$key]['value'] * $aggregates[$key]['count'] + $entry->value) / ($aggregates[$key]['count'] + 1); + $aggregates[$key]['count']++; + } + } + } + + return collect(array_values($aggregates)); + } + + /** + * The periods to aggregate for. + * + * @return list + */ + protected function periods(): array + { + return [ + (int) (CarbonInterval::hour()->totalSeconds / 60), + (int) (CarbonInterval::hours(6)->totalSeconds / 60), + (int) (CarbonInterval::hours(24)->totalSeconds / 60), + (int) (CarbonInterval::days(7)->totalSeconds / 60), + ]; } /** @@ -346,10 +444,10 @@ public function aggregate( foreach ($aggregates as $aggregate) { $query->selectRaw(match ($aggregate) { - 'count' => 'sum(`count`)', - 'max' => 'max(`max`)', - 'avg' => 'avg(`avg`)', - }." as `{$aggregate}`"); + 'count' => "sum({$this->wrap('count')})", + 'max' => "max({$this->wrap('max')})", + 'avg' => "avg({$this->wrap('avg')})", + }." as {$this->wrap($aggregate)}"); } $query->fromSub(function (Builder $query) use ($type, $aggregates, $interval) { @@ -365,9 +463,9 @@ public function aggregate( foreach ($aggregates as $aggregate) { $query->selectRaw(match ($aggregate) { 'count' => 'count(*)', - 'max' => 'max(`value`)', - 'avg' => 'avg(`value`)', - }." as `{$aggregate}`"); + 'max' => "max({$this->wrap('value')})", + 'avg' => "avg({$this->wrap('value')})", + }." as {$this->wrap($aggregate)}"); } $query @@ -385,12 +483,12 @@ public function aggregate( foreach ($aggregates as $aggregate) { if ($aggregate === $currentAggregate) { $query->selectRaw(match ($aggregate) { - 'count' => 'sum(`value`)', - 'max' => 'max(`value`)', - 'avg' => 'avg(`value`)', - }." as `$aggregate`"); + 'count' => "sum({$this->wrap('value')})", + 'max' => "max({$this->wrap('value')})", + 'avg' => "avg({$this->wrap('value')})", + }." as {$this->wrap($aggregate)}"); } else { - $query->selectRaw("null as `$aggregate`"); + $query->selectRaw("null as {$this->wrap($aggregate)}"); } } @@ -448,10 +546,10 @@ public function aggregateTypes( foreach ($types as $type) { $query->selectRaw(match ($aggregate) { - 'count' => 'sum(`'.$type.'`)', - 'max' => 'max(`'.$type.'`)', - 'avg' => 'avg(`'.$type.'`)', - }." as `{$type}`"); + 'count' => "sum({$this->wrap($type)})", + 'max' => "max({$this->wrap($type)})", + 'avg' => "avg({$this->wrap($type)})", + }." as {$this->wrap($type)}"); } $query->fromSub(function (Builder $query) use ($types, $aggregate, $interval) { @@ -466,10 +564,10 @@ public function aggregateTypes( foreach ($types as $type) { $query->selectRaw(match ($aggregate) { - 'count' => 'count(case when (`type` = ?) then true else null end)', - 'max' => 'max(case when (`type` = ?) then `value` else null end)', - 'avg' => 'avg(case when (`type` = ?) then `value` else null end)', - }." as `{$type}`", [$type]); + 'count' => "count(case when ({$this->wrap('type')} = ?) then true else null end)", + 'max' => "max(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)", + 'avg' => "avg(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)", + }." as {$this->wrap($type)}", [$type]); } $query @@ -485,10 +583,10 @@ public function aggregateTypes( foreach ($types as $type) { $query->selectRaw(match ($aggregate) { - 'count' => 'sum(case when (`type` = ?) then `value` else null end)', - 'max' => 'max(case when (`type` = ?) then `value` else null end)', - 'avg' => 'avg(case when (`type` = ?) then `value` else null end)', - }." as `{$type}`", [$type]); + 'count' => "sum(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)", + 'max' => "max(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)", + 'avg' => "avg(case when ({$this->wrap('type')} = ?) then {$this->wrap('value')} else null end)", + }." as {$this->wrap($type)}", [$type]); } $query @@ -536,18 +634,18 @@ public function aggregateTotal( return $this->connection()->query() ->addSelect('type') ->selectRaw(match ($aggregate) { - 'count' => 'sum(`count`)', - 'max' => 'max(`max`)', - 'avg' => 'avg(`avg`)', - }." as `{$aggregate}`") + 'count' => "sum({$this->wrap('count')})", + 'max' => "max({$this->wrap('max')})", + 'avg' => "avg({$this->wrap('avg')})", + }." as {$this->wrap($aggregate)}") ->fromSub(fn (Builder $query) => $query // Tail ->addSelect('type') ->selectRaw(match ($aggregate) { 'count' => 'count(*)', - 'max' => 'max(`value`)', - 'avg' => 'avg(`value`)', - }." as `{$aggregate}`") + 'max' => "max({$this->wrap('value')})", + 'avg' => "avg({$this->wrap('value')})", + }." as {$this->wrap($aggregate)}") ->from('pulse_entries') ->whereIn('type', $types) ->where('timestamp', '>=', $tailStart) @@ -557,10 +655,10 @@ public function aggregateTotal( ->unionAll(fn (Builder $query) => $query ->select('type') ->selectRaw(match ($aggregate) { - 'count' => 'sum(`value`)', - 'max' => 'max(`value`)', - 'avg' => 'avg(`value`)', - }."as `{$aggregate}`") + 'count' => "sum({$this->wrap('value')})", + 'max' => "max({$this->wrap('value')})", + 'avg' => "avg({$this->wrap('value')})", + }." as {$this->wrap($aggregate)}") ->from('pulse_aggregates') ->where('period', $period) ->whereIn('type', $types) @@ -581,4 +679,12 @@ protected function connection(): Connection { return $this->db->connection($this->config->get('pulse.storage.database.connection')); } + + /** + * Wrap a value in keyword identifiers. + */ + protected function wrap(string $value): string + { + return $this->connection()->getQueryGrammar()->wrap($value); + } } diff --git a/tests/Feature/Recorders/SlowRequestsTest.php b/tests/Feature/Recorders/SlowRequestsTest.php index b0e9f38a..6e93ed48 100644 --- a/tests/Feature/Recorders/SlowRequestsTest.php +++ b/tests/Feature/Recorders/SlowRequestsTest.php @@ -28,7 +28,7 @@ expect($entries[0]->timestamp)->toBe(946782245); expect($entries[0]->type)->toBe('slow_request'); expect($entries[0]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($entries[0]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($entries[0]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($entries[0]->value)->toBe(4000); $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->orderBy('type')->orderBy('period')->orderBy('aggregate')->get()); @@ -39,7 +39,7 @@ expect($aggregates[0]->type)->toBe('slow_request'); expect($aggregates[0]->aggregate)->toBe('count'); expect($aggregates[0]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[0]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[0]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[0]->value)->toBe('1.00'); expect($aggregates[1]->bucket)->toBe(946782240); @@ -47,7 +47,7 @@ expect($aggregates[1]->type)->toBe('slow_request'); expect($aggregates[1]->aggregate)->toBe('max'); expect($aggregates[1]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[1]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[1]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[1]->value)->toBe('4000.00'); expect($aggregates[2]->bucket)->toBe(946782000); @@ -55,7 +55,7 @@ expect($aggregates[2]->type)->toBe('slow_request'); expect($aggregates[2]->aggregate)->toBe('count'); expect($aggregates[2]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[2]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[2]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[2]->value)->toBe('1.00'); expect($aggregates[3]->bucket)->toBe(946782000); @@ -63,7 +63,7 @@ expect($aggregates[3]->type)->toBe('slow_request'); expect($aggregates[3]->aggregate)->toBe('max'); expect($aggregates[3]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[3]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[3]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[3]->value)->toBe('4000.00'); expect($aggregates[4]->bucket)->toBe(946781280); @@ -71,7 +71,7 @@ expect($aggregates[4]->type)->toBe('slow_request'); expect($aggregates[4]->aggregate)->toBe('count'); expect($aggregates[4]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[4]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[4]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[4]->value)->toBe('1.00'); expect($aggregates[5]->bucket)->toBe(946781280); @@ -79,21 +79,21 @@ expect($aggregates[5]->type)->toBe('slow_request'); expect($aggregates[5]->aggregate)->toBe('max'); expect($aggregates[5]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[5]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[5]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[5]->value)->toBe('4000.00'); expect($aggregates[6]->period)->toBe(10080); expect($aggregates[6]->type)->toBe('slow_request'); expect($aggregates[6]->aggregate)->toBe('count'); expect($aggregates[6]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[6]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[6]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[6]->value)->toBe('1.00'); expect($aggregates[7]->period)->toBe(10080); expect($aggregates[7]->type)->toBe('slow_request'); expect($aggregates[7]->aggregate)->toBe('max'); expect($aggregates[7]->key)->toBe(json_encode(['GET', '/test-route', 'Closure'])); - expect($aggregates[7]->key_hash)->toBe(hex2bin(md5(json_encode(['GET', '/test-route', 'Closure'])))); + expect($aggregates[7]->key_hash)->toBe(keyHash(json_encode(['GET', '/test-route', 'Closure']))); expect($aggregates[7]->value)->toBe('4000.00'); Pulse::ignore(fn () => expect(DB::table('pulse_values')->count())->toBe(0)); @@ -114,7 +114,7 @@ expect($entries[0]->timestamp)->toBe(946782245); expect($entries[0]->type)->toBe('slow_user_request'); expect($entries[0]->key)->toBe('4321'); - expect($entries[0]->key_hash)->toBe(hex2bin(md5('4321'))); + expect($entries[0]->key_hash)->toBe(keyHash('4321')); expect($entries[0]->value)->toBeNull(); $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->where('type', 'slow_user_request')->orderBy('period')->orderBy('aggregate')->get()); @@ -125,7 +125,7 @@ expect($aggregates[0]->type)->toBe('slow_user_request'); expect($aggregates[0]->aggregate)->toBe('count'); expect($aggregates[0]->key)->toBe('4321'); - expect($aggregates[0]->key_hash)->toBe(hex2bin(md5('4321'))); + expect($aggregates[0]->key_hash)->toBe(keyHash('4321')); expect($aggregates[0]->value)->toBe('1.00'); expect($aggregates[1]->bucket)->toBe(946782000); @@ -133,7 +133,7 @@ expect($aggregates[1]->type)->toBe('slow_user_request'); expect($aggregates[1]->aggregate)->toBe('count'); expect($aggregates[1]->key)->toBe('4321'); - expect($aggregates[1]->key_hash)->toBe(hex2bin(md5('4321'))); + expect($aggregates[1]->key_hash)->toBe(keyHash('4321')); expect($aggregates[1]->value)->toBe('1.00'); expect($aggregates[2]->bucket)->toBe(946781280); @@ -141,14 +141,14 @@ expect($aggregates[2]->type)->toBe('slow_user_request'); expect($aggregates[2]->aggregate)->toBe('count'); expect($aggregates[2]->key)->toBe('4321'); - expect($aggregates[2]->key_hash)->toBe(hex2bin(md5('4321'))); + expect($aggregates[2]->key_hash)->toBe(keyHash('4321')); expect($aggregates[2]->value)->toBe('1.00'); expect($aggregates[3]->period)->toBe(10080); expect($aggregates[3]->type)->toBe('slow_user_request'); expect($aggregates[3]->aggregate)->toBe('count'); expect($aggregates[3]->key)->toBe('4321'); - expect($aggregates[3]->key_hash)->toBe(hex2bin(md5('4321'))); + expect($aggregates[3]->key_hash)->toBe(keyHash('4321')); expect($aggregates[3]->value)->toBe('1.00'); Pulse::ignore(fn () => expect(DB::table('pulse_values')->count())->toBe(0)); @@ -268,7 +268,7 @@ expect($entries[0]->timestamp)->toBe(946782245); expect($entries[0]->type)->toBe('slow_request'); expect($entries[0]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($entries[0]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($entries[0]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($entries[0]->value)->toBe(4000); $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->orderBy('type')->orderBy('period')->orderBy('aggregate')->get()); @@ -279,7 +279,7 @@ expect($aggregates[0]->type)->toBe('slow_request'); expect($aggregates[0]->aggregate)->toBe('count'); expect($aggregates[0]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[0]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[0]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[0]->value)->toBe('1.00'); expect($aggregates[1]->bucket)->toBe(946782240); @@ -287,7 +287,7 @@ expect($aggregates[1]->type)->toBe('slow_request'); expect($aggregates[1]->aggregate)->toBe('max'); expect($aggregates[1]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[1]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[1]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[1]->value)->toBe('4000.00'); expect($aggregates[2]->bucket)->toBe(946782000); @@ -295,7 +295,7 @@ expect($aggregates[2]->type)->toBe('slow_request'); expect($aggregates[2]->aggregate)->toBe('count'); expect($aggregates[2]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[2]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[2]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[2]->value)->toBe('1.00'); expect($aggregates[3]->bucket)->toBe(946782000); @@ -303,7 +303,7 @@ expect($aggregates[3]->type)->toBe('slow_request'); expect($aggregates[3]->aggregate)->toBe('max'); expect($aggregates[3]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[3]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[3]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[3]->value)->toBe('4000.00'); expect($aggregates[4]->bucket)->toBe(946781280); @@ -311,7 +311,7 @@ expect($aggregates[4]->type)->toBe('slow_request'); expect($aggregates[4]->aggregate)->toBe('count'); expect($aggregates[4]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[4]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[4]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[4]->value)->toBe('1.00'); expect($aggregates[5]->bucket)->toBe(946781280); @@ -319,7 +319,7 @@ expect($aggregates[5]->type)->toBe('slow_request'); expect($aggregates[5]->aggregate)->toBe('max'); expect($aggregates[5]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[5]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[5]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[5]->value)->toBe('4000.00'); expect($aggregates[6]->bucket)->toBe(946774080); @@ -327,7 +327,7 @@ expect($aggregates[6]->type)->toBe('slow_request'); expect($aggregates[6]->aggregate)->toBe('count'); expect($aggregates[6]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[6]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[6]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[6]->value)->toBe('1.00'); expect($aggregates[7]->bucket)->toBe(946774080); @@ -335,7 +335,7 @@ expect($aggregates[7]->type)->toBe('slow_request'); expect($aggregates[7]->aggregate)->toBe('max'); expect($aggregates[7]->key)->toBe(json_encode(['POST', '/test-route', 'via /livewire/update'])); - expect($aggregates[7]->key_hash)->toBe(hex2bin(md5(json_encode(['POST', '/test-route', 'via /livewire/update'])))); + expect($aggregates[7]->key_hash)->toBe(keyHash(json_encode(['POST', '/test-route', 'via /livewire/update']))); expect($aggregates[7]->value)->toBe('4000.00'); Pulse::ignore(fn () => expect(DB::table('pulse_values')->count())->toBe(0)); diff --git a/tests/Feature/Storage/DatabaseStorageTest.php b/tests/Feature/Storage/DatabaseStorageTest.php index 3b783317..93fc05c4 100644 --- a/tests/Feature/Storage/DatabaseStorageTest.php +++ b/tests/Feature/Storage/DatabaseStorageTest.php @@ -2,8 +2,82 @@ use Carbon\CarbonInterval; use Illuminate\Support\Carbon; +use Illuminate\Support\Facades\DB; use Laravel\Pulse\Facades\Pulse; +it('combines duplicate count aggregates before upserting', function () { + $queries = collect(); + DB::listen(fn ($query) => $queries[] = $query); + + Pulse::record('type', 'key1')->count(); + Pulse::record('type', 'key1')->count(); + Pulse::record('type', 'key1')->count(); + Pulse::record('type', 'key2')->count(); + Pulse::store(); + + expect($queries)->toHaveCount(2); + expect($queries[0]->sql)->toContain('pulse_entries'); + expect($queries[1]->sql)->toContain('pulse_aggregates'); + expect($queries[0]->bindings)->toHaveCount(4 * 4); // 4 entries, 4 columns each + expect($queries[1]->bindings)->toHaveCount(2 * 6 * 4); // 2 entries, 6 columns each, 4 periods + + $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->where('period', 60)->orderBy('key')->pluck('value', 'key')); + expect($aggregates['key1'])->toEqual(3); + expect($aggregates['key2'])->toEqual(1); +}); + +it('combines duplicate max aggregates before upserting', function () { + $queries = collect(); + DB::listen(fn ($query) => $queries[] = $query); + + Pulse::record('type', 'key1', 100)->max(); + Pulse::record('type', 'key1', 300)->max(); + Pulse::record('type', 'key1', 200)->max(); + Pulse::record('type', 'key2', 100)->max(); + Pulse::store(); + + expect($queries)->toHaveCount(2); + expect($queries[0]->sql)->toContain('pulse_entries'); + expect($queries[1]->sql)->toContain('pulse_aggregates'); + expect($queries[0]->bindings)->toHaveCount(4 * 4); // 4 entries, 4 columns each + expect($queries[1]->bindings)->toHaveCount(2 * 6 * 4); // 2 entries, 6 columns each, 4 periods + + $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->where('period', 60)->orderBy('key')->pluck('value', 'key')); + expect($aggregates['key1'])->toEqual(300); + expect($aggregates['key2'])->toEqual(100); +}); + +it('combines duplicate average aggregates before upserting', function () { + $queries = collect(); + DB::listen(fn ($query) => $queries[] = $query); + + Pulse::record('type', 'key1', 100)->avg(); + Pulse::record('type', 'key1', 300)->avg(); + Pulse::record('type', 'key1', 200)->avg(); + Pulse::record('type', 'key2', 100)->avg(); + Pulse::store(); + + expect($queries)->toHaveCount(2); + expect($queries[0]->sql)->toContain('pulse_entries'); + expect($queries[1]->sql)->toContain('pulse_aggregates'); + expect($queries[0]->bindings)->toHaveCount(4 * 4); // 4 entries, 4 columns each + expect($queries[1]->bindings)->toHaveCount(2 * 7 * 4); // 2 entries, 7 columns each, 4 periods + + $aggregates = Pulse::ignore(fn () => DB::table('pulse_aggregates')->where('period', 60)->orderBy('key')->get())->keyBy('key'); + expect($aggregates['key1']->value)->toEqual(200); + expect($aggregates['key2']->value)->toEqual(100); + expect($aggregates['key1']->count)->toEqual(3); + expect($aggregates['key2']->count)->toEqual(1); + + Pulse::record('type', 'key1', 400)->avg(); + Pulse::record('type', 'key1', 400)->avg(); + Pulse::record('type', 'key1', 400)->avg(); + Pulse::store(); + $aggregate = Pulse::ignore(fn () => DB::table('pulse_aggregates')->where('period', 60)->where('key', 'key1')->first()); + expect($aggregate->count)->toEqual(6); + expect($aggregate->value)->toEqual(300); +}); + test('one or more aggregates for a single type', function () { /* | key | max | avg | count | diff --git a/tests/Pest.php b/tests/Pest.php index 505e902a..d6a17a1b 100644 --- a/tests/Pest.php +++ b/tests/Pest.php @@ -4,6 +4,7 @@ use Illuminate\Database\Eloquent\Model; use Illuminate\Support\Collection; use Illuminate\Support\Facades\Config; +use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Event; use Illuminate\Support\Facades\Gate; use Illuminate\Support\Facades\Process; @@ -11,6 +12,7 @@ use Illuminate\Support\Str; use Laravel\Pulse\Facades\Pulse; use PHPUnit\Framework\Assert; +use Ramsey\Uuid\Uuid; use Tests\TestCase; /* @@ -68,7 +70,7 @@ 'type' => $type, 'aggregate' => $aggregate, 'key' => $key, - 'key_hash' => hex2bin(md5($key)), + 'key_hash' => keyHash($key), 'value' => $value, 'count' => $count, ]; @@ -91,6 +93,14 @@ | */ +function keyHash(string $string): string +{ + return match (DB::connection()->getDriverName()) { + 'mysql' => hex2bin(md5($string)), + 'pgsql' => Uuid::fromString(md5($string)), + }; +} + function prependListener(string $event, callable $listener): void { $listeners = Event::getRawListeners()[$event]; diff --git a/tests/TestCase.php b/tests/TestCase.php index 2de70fbd..08867bda 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,6 +2,7 @@ namespace Tests; +use Illuminate\Contracts\Config\Repository; use Illuminate\Foundation\Testing\RefreshDatabase; use Orchestra\Testbench\TestCase as OrchestraTestCase; @@ -22,4 +23,11 @@ protected function defineDatabaseMigrations(): void { $this->loadMigrationsFrom(__DIR__.'/migrations'); } + + protected function defineEnvironment($app): void + { + tap($app['config'], function (Repository $config) { + $config->set('queue.failed.driver', 'null'); + }); + } }