Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Commands/CheckCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function handle(
while (true) {
$now = new CarbonImmutable();

if ($cache->get('laravel:pulse:restart') !== $lastRestart) {
if ($lastRestart !== $cache->get('laravel:pulse:restart')) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pint did this in the GH workflow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... it almost feels like it created a Yoda rather than removing it, but maybe my understanding of Yodas is incomplete.

return self::SUCCESS;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Commands/WorkCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public function handle(
while (true) {
$now = new CarbonImmutable;

if ($cache->get('laravel:pulse:restart') !== $lastRestart) {
if ($lastRestart !== $cache->get('laravel:pulse:restart')) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pint did this in the GH workflow.

return self::SUCCESS;
}

Expand Down
15 changes: 0 additions & 15 deletions src/Entries/SlowJobFinished.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Laravel\Pulse\Entries;

use Illuminate\Database\Connection;

/**
* @internal
*/
Expand All @@ -19,19 +17,6 @@ public function __construct(
//
}

/**
* Perform the update.
*/
public function perform(Connection $db): void
{
$db->table($this->table())
->where('job_uuid', $this->jobUuid)
->update([
'slowest' => $db->raw("COALESCE(GREATEST(`slowest`,{$this->duration}),{$this->duration})"),
'slow' => $db->raw('`slow` + 1'),
]);
}

/**
* The update's table.
*/
Expand Down
7 changes: 0 additions & 7 deletions src/Entries/Update.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@

namespace Laravel\Pulse\Entries;

use Illuminate\Database\Connection;

abstract class Update
{
/**
* The update's table.
*/
abstract public function table(): string;

/**
* Perform the update.
*/
abstract public function perform(Connection $db): void;

/**
* Resolve the update for ingest and storage.
*/
Expand Down
2 changes: 1 addition & 1 deletion src/Ingests/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected function trimAfter(): Interval
/**
* Get the redis connection.
*/
protected function connection(): RedisAdapter
public function connection(): RedisAdapter
{
return new RedisAdapter($this->config, $this->manager->connection(
$this->config->get('pulse.ingest.redis.connection')
Expand Down
2 changes: 2 additions & 0 deletions src/PulseServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public function register(): void
}

$this->app->singleton(Pulse::class);
$this->app->singleton(DatabaseStorage::class);
$this->app->singleton(RedisIngest::class);

$this->app->bind(Storage::class, DatabaseStorage::class);

Expand Down
51 changes: 49 additions & 2 deletions src/Storage/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,26 @@

use Carbon\CarbonImmutable;
use Carbon\CarbonInterval as Interval;
use Closure;
use Illuminate\Config\Repository;
use Illuminate\Database\Connection;
use Illuminate\Database\DatabaseManager;
use Illuminate\Support\Collection;
use Illuminate\Support\Traits\ReflectsClosures;
use Laravel\Pulse\Contracts\Storage;
use Laravel\Pulse\Entries\Entry;
use Laravel\Pulse\Entries\SlowJobFinished;
use Laravel\Pulse\Entries\Update;

class Database implements Storage
{
use ReflectsClosures;

/**
* Additional storage update handlers.
*/
protected array $updateHandlers = [];

/**
* Create a new Database storage instance.
*/
Expand All @@ -24,6 +34,20 @@ public function __construct(
//
}

/**
* Handle the update using the closure.
*
* @param (callable(Update): void) $callback
*/
public function handleUpdateUsing($callback): self
{
foreach ($this->firstClosureParameterTypes(Closure::fromCallable($callback)) as $class) {
$this->updateHandlers[$class] = $callback;
}

return $this;
}

/**
* Store the entries and updates.
*
Expand All @@ -43,7 +67,30 @@ public function store(Collection $items): void
->map(fn (Collection $inserts) => $inserts->pluck('attributes')->all())
->each($this->connection()->table($table)->insert(...)));

$updates->each(fn (Update $update) => $update->perform($this->connection()));
$this->perform($updates);
});
}

/**
* Perform the given updates.
*/
protected function perform($updates)
{
$updates->each(function (Update $update) {
if ($this->updateHandlers[$update::class] ?? false) {
$this->updateHandlers[$update::class]($update);

return;
}

if ($update instanceof SlowJobFinished) {
$this->connection()->table($update->table())
->where('job_uuid', $update->jobUuid)
->update([
'slowest' => $this->connection()->raw("COALESCE(GREATEST(`slowest`,{$update->duration}),{$update->duration})"),
'slow' => $this->connection()->raw('`slow` + 1'),
]);
}
});
}

Expand All @@ -69,7 +116,7 @@ protected function trimAfter(): Interval
/**
* Get the database connection.
*/
protected function connection(): Connection
public function connection(): Connection
{
return $this->manager->connection(
$this->config->get('pulse.storage.database.connection') ?? $this->config->get('database.default')
Expand Down
41 changes: 41 additions & 0 deletions tests/Feature/DatabaseStorageTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

use Illuminate\Support\Facades\DB;
use Laravel\Pulse\Entries\Entry;
use Laravel\Pulse\Entries\SlowJobFinished;
use Laravel\Pulse\Storage\Database;

it('performs slow job updates', function () {
$storage = App::make(Database::class);
$update = new SlowJobFinished('job-uuid', 321);

$storage->store(collect([
new Entry('pulse_jobs', [
'date' => now()->toDateTimeString(),
'job' => 'MyJob',
'job_uuid' => 'job-uuid',
'user_id' => '55',
]),
]));

$storage->store(collect([
new SlowJobFinished('job-uuid', 456),
]));

$jobs = DB::table('pulse_jobs')->get();
$this->assertCount(1, $jobs);
$this->assertSame(1, $jobs[0]->slow);
$this->assertSame(456, $jobs[0]->slowest);
});

it('allows custom update handlers', function () {
$captured = null;
App::make(Database::class)->handleUpdateUsing(function (SlowJobFinished $update) use (&$captured) {
$captured = $update;
});

$update = new SlowJobFinished('job-uuid', 321);
App::make(Database::class)->store(collect([$update])); // resolve again to ensure that the handlers persist across different resolves from container.

$this->assertSame($update, $captured);
});