Skip to content

Commit

Permalink
Merge pull request #325 from reliforp/auto-context-recovering
Browse files Browse the repository at this point in the history
Automatically respawn workers on the daemon mode
  • Loading branch information
sj-i committed Nov 19, 2023
2 parents 30faa69 + ef4ea05 commit 44cb2bc
Show file tree
Hide file tree
Showing 13 changed files with 689 additions and 90 deletions.
86 changes: 86 additions & 0 deletions src/Inspector/Daemon/AutoContextRecovering.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

/**
* This file is part of the reliforp/reli-prof package.
*
* (c) sji <sji@sj-i.dev>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Reli\Inspector\Daemon;

use Amp\Parallel\Context\ContextException;
use Reli\Lib\Amphp\ContextInterface;
use Reli\Lib\Amphp\MessageProtocolInterface;
use Reli\Lib\Log\Log;

/**
* @template T of MessageProtocolInterface
* @implements AutoContextRecoveringInterface<T>
*/
final class AutoContextRecovering implements AutoContextRecoveringInterface
{
private const RETRY_COUNT = 10;

/** @var ContextInterface<T>|null */
private ?ContextInterface $context = null;

private ?\Closure $on_recover_callback = null;

/**
* @param \Closure():ContextInterface<T> $context_factory
*/
public function __construct(
private readonly \Closure $context_factory,
) {
}

public function recreateContext(): void
{
if (!is_null($this->context) and $this->context->isRunning()) {
$this->context->stop();
}
$this->context = null;
}

public function getContext(): ContextInterface
{
if (is_null($this->context)) {
$this->context = $this->context_factory->__invoke();
if (!is_null($this->on_recover_callback)) {
$this->on_recover_callback->__invoke();
}
}
return $this->context;
}

public function withAutoRecover(
\Closure $callback,
string $log_message_on_retry,
): mixed {
for ($i = 0; $i < self::RETRY_COUNT; $i++) {
try {
return $callback($this->getContext()->getProtocol());
} catch (ContextException $e) {
Log::info(
$log_message_on_retry,
[
'exception' => $e,
]
);
$this->recreateContext();
}
}
assert($e instanceof \Throwable);
throw $e;
}

public function onRecover(\Closure $callback): void
{
$this->on_recover_callback = $callback;
}
}
39 changes: 39 additions & 0 deletions src/Inspector/Daemon/AutoContextRecoveringInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

/**
* This file is part of the reliforp/reli-prof package.
*
* (c) sji <sji@sj-i.dev>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Reli\Inspector\Daemon;

use Reli\Lib\Amphp\ContextInterface;
use Reli\Lib\Amphp\MessageProtocolInterface;

/** @template T of MessageProtocolInterface */
interface AutoContextRecoveringInterface
{
public function recreateContext(): void;

/** @return ContextInterface<T> */
public function getContext(): ContextInterface;

/**
* @template TReturn
* @param \Closure(T):TReturn $callback
* @return TReturn
*/
public function withAutoRecover(
\Closure $callback,
string $log_message_on_retry,
): mixed;

/** @param \Closure():void $callback */
public function onRecover(\Closure $callback): void;
}
11 changes: 7 additions & 4 deletions src/Inspector/Daemon/Reader/Context/PhpReaderContextCreator.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Reli\Inspector\Daemon\Reader\Context;

use Reli\Inspector\Daemon\AutoContextRecovering;
use Reli\Inspector\Daemon\Reader\Controller\PhpReaderController;
use Reli\Inspector\Daemon\Reader\Controller\PhpReaderControllerInterface;
use Reli\Inspector\Daemon\Reader\Controller\PhpReaderControllerProtocol;
Expand All @@ -30,10 +31,12 @@ public function __construct(
public function create(): PhpReaderControllerInterface
{
return new PhpReaderController(
$this->context_creator->create(
PhpReaderEntryPoint::class,
PhpReaderWorkerProtocol::class,
PhpReaderControllerProtocol::class
new AutoContextRecovering(
fn () => $this->context_creator->create(
PhpReaderEntryPoint::class,
PhpReaderWorkerProtocol::class,
PhpReaderControllerProtocol::class
)
)
);
}
Expand Down
68 changes: 55 additions & 13 deletions src/Inspector/Daemon/Reader/Controller/PhpReaderController.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Reli\Inspector\Daemon\Reader\Controller;

use Reli\Inspector\Daemon\AutoContextRecoveringInterface;
use Reli\Inspector\Daemon\Dispatcher\TargetProcessDescriptor;
use Reli\Inspector\Daemon\Reader\Protocol\Message\DetachWorkerMessage;
use Reli\Inspector\Daemon\Reader\Protocol\Message\TraceMessage;
Expand All @@ -21,47 +22,88 @@
use Reli\Inspector\Daemon\Reader\Protocol\PhpReaderControllerProtocolInterface;
use Reli\Inspector\Settings\GetTraceSettings\GetTraceSettings;
use Reli\Inspector\Settings\TraceLoopSettings\TraceLoopSettings;
use Reli\Lib\Amphp\ContextInterface;

final class PhpReaderController implements PhpReaderControllerInterface
{
/** @param ContextInterface<PhpReaderControllerProtocolInterface> $context */
private ?SetSettingsMessage $settings_already_sent = null;
private ?AttachMessage $attach_already_sent = null;

/**
* @param AutoContextRecoveringInterface<PhpReaderControllerProtocol> $auto_context_recovering
*/
public function __construct(
private ContextInterface $context
private AutoContextRecoveringInterface $auto_context_recovering,
) {
$this->auto_context_recovering->onRecover(
function () {
if ($this->settings_already_sent !== null) {
$this->auto_context_recovering
->getContext()
->getProtocol()
->sendSettings($this->settings_already_sent)
;
}
if ($this->attach_already_sent !== null) {
$this->auto_context_recovering
->getContext()
->getProtocol()
->sendAttach($this->attach_already_sent)
;
}
}
);
}

public function start(): void
{
$this->context->start();
$this->auto_context_recovering->getContext()->start();
}

public function isRunning(): bool
{
return $this->context->isRunning();
return $this->auto_context_recovering->getContext()->isRunning();
}

public function sendSettings(
TraceLoopSettings $loop_settings,
GetTraceSettings $get_trace_settings
): void {
$this->context->getProtocol()->sendSettings(
new SetSettingsMessage(
$loop_settings,
$get_trace_settings
)
$settings_message = new SetSettingsMessage(
$loop_settings,
$get_trace_settings
);
$this->auto_context_recovering->withAutoRecover(
function (PhpReaderControllerProtocolInterface $protocol) use ($settings_message) {
$protocol->sendSettings($settings_message);
$this->settings_already_sent = $settings_message;
},
'failed on sending settings to worker'
);
}

public function sendAttach(TargetProcessDescriptor $process_descriptor): void
{
$this->context->getProtocol()->sendAttach(
new AttachMessage($process_descriptor)
$attach_message = new AttachMessage($process_descriptor);
$this->auto_context_recovering->withAutoRecover(
function (PhpReaderControllerProtocolInterface $protocol) use ($attach_message) {
$protocol->sendAttach($attach_message);
$this->attach_already_sent = $attach_message;
},
'failed on attaching worker'
);
}

public function receiveTraceOrDetachWorker(): TraceMessage|DetachWorkerMessage
{
return $this->context->getProtocol()->receiveTraceOrDetachWorker();
return $this->auto_context_recovering->withAutoRecover(
function (PhpReaderControllerProtocolInterface $protocol): TraceMessage|DetachWorkerMessage {
$message = $protocol->receiveTraceOrDetachWorker();
if ($message instanceof DetachWorkerMessage) {
$this->attach_already_sent = null;
}
return $message;
},
'failed to receive trace or detach worker'
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

namespace Reli\Inspector\Daemon\Searcher\Context;

use Reli\Inspector\Daemon\AutoContextRecovering;
use Reli\Inspector\Daemon\Searcher\Controller\PhpSearcherController;
use Reli\Inspector\Daemon\Searcher\Controller\PhpSearcherControllerInterface;
use Reli\Inspector\Daemon\Searcher\Controller\PhpSearcherControllerProtocol;
Expand All @@ -30,10 +31,12 @@ public function __construct(
public function create(): PhpSearcherControllerInterface
{
return new PhpSearcherController(
$this->context_creator->create(
PhpSearcherEntryPoint::class,
PhpSearcherWorkerProtocol::class,
PhpSearcherControllerProtocol::class
new AutoContextRecovering(
fn () => $this->context_creator->create(
PhpSearcherEntryPoint::class,
PhpSearcherWorkerProtocol::class,
PhpSearcherControllerProtocol::class
)
)
);
}
Expand Down
49 changes: 35 additions & 14 deletions src/Inspector/Daemon/Searcher/Controller/PhpSearcherController.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,36 @@

namespace Reli\Inspector\Daemon\Searcher\Controller;

use Reli\Inspector\Daemon\AutoContextRecoveringInterface;
use Reli\Inspector\Daemon\Searcher\Protocol\Message\TargetPhpSettingsMessage;
use Reli\Inspector\Daemon\Searcher\Protocol\Message\UpdateTargetProcessMessage;
use Reli\Inspector\Daemon\Searcher\Protocol\PhpSearcherControllerProtocolInterface;
use Reli\Inspector\Settings\TargetPhpSettings\TargetPhpSettings;
use Reli\Lib\Amphp\ContextInterface;

final class PhpSearcherController implements PhpSearcherControllerInterface
{
/** @param ContextInterface<PhpSearcherControllerProtocolInterface> $context */
private ?TargetPhpSettingsMessage $settings_already_sent = null;

/** @param AutoContextRecoveringInterface<PhpSearcherControllerProtocol> $auto_context_recovering */
public function __construct(
private ContextInterface $context
readonly private AutoContextRecoveringInterface $auto_context_recovering
) {
$this->auto_context_recovering->onRecover(
function (): void {
if ($this->settings_already_sent !== null) {
$this->auto_context_recovering
->getContext()
->getProtocol()
->sendTargetRegex($this->settings_already_sent)
;
}
}
);
}

public function start(): void
{
$this->context->start();
$this->auto_context_recovering->getContext()->start();
}

/** @param non-empty-string $regex */
Expand All @@ -38,19 +51,27 @@ public function sendTarget(
TargetPhpSettings $target_php_settings,
int $pid,
): void {
$this->context->getProtocol()
->sendTargetRegex(
new TargetPhpSettingsMessage(
$regex,
$target_php_settings,
$pid
)
)
;
$message = new TargetPhpSettingsMessage(
$regex,
$target_php_settings,
$pid
);
$this->auto_context_recovering->withAutoRecover(
function (PhpSearcherControllerProtocolInterface $protocol) use ($message) {
$protocol->sendTargetRegex($message);
},
'failed to send target',
);
$this->settings_already_sent = $message;
}

public function receivePidList(): UpdateTargetProcessMessage
{
return $this->context->getProtocol()->receiveUpdateTargetProcess();
return $this->auto_context_recovering->withAutoRecover(
function (PhpSearcherControllerProtocolInterface $protocol) {
return $protocol->receiveUpdateTargetProcess();
},
'failed to receive pid list',
);
}
}
5 changes: 5 additions & 0 deletions src/Lib/Amphp/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public function isRunning(): bool
return !$this->amphp_context->isClosed();
}

public function stop(): void
{
$this->amphp_context->close();
}

/** @return T */
public function getProtocol(): object
{
Expand Down
2 changes: 2 additions & 0 deletions src/Lib/Amphp/ContextInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ interface ContextInterface
{
public function start(): void;

public function stop(): void;

public function isRunning(): bool;

/** @return T */
Expand Down
Loading

0 comments on commit 44cb2bc

Please sign in to comment.