Skip to content

Commit

Permalink
fix numbers of workers and make newly created processes after startup…
Browse files Browse the repository at this point in the history
… of the daemon detectable
  • Loading branch information
sj-i committed Jun 29, 2020
1 parent c9ff989 commit f21da20
Show file tree
Hide file tree
Showing 17 changed files with 329 additions and 94 deletions.
72 changes: 36 additions & 36 deletions src/Command/Inspector/DaemonCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@

use Amp\Loop;
use Amp\Promise;
use PhpProfiler\Inspector\Daemon\Dispatcher\DispatchTable;
use PhpProfiler\Inspector\Daemon\Dispatcher\Message\TraceMessage;
use PhpProfiler\Inspector\Daemon\Dispatcher\WorkerPool;
use PhpProfiler\Inspector\Daemon\Reader\Context\PhpReaderContext;
use PhpProfiler\Inspector\Daemon\Reader\Context\PhpReaderContextCreator;
use PhpProfiler\Inspector\Daemon\Searcher\Context\PhpSearcherContextCreator;
use PhpProfiler\Inspector\Settings\DaemonSettings;
Expand Down Expand Up @@ -119,31 +120,25 @@ public function execute(InputInterface $input, OutputInterface $output): int
$searcher_context = $this->php_searcher_context_creator->create();
Promise\wait($searcher_context->start());
Promise\wait($searcher_context->sendTargetRegex($daemon_settings->target_regex));
$pid_list = Promise\wait($searcher_context->receivePidList());

$worker_pool = WorkerPool::create($this->php_reader_context_creator, $daemon_settings->threads);
$worker_pool = WorkerPool::create(
$this->php_reader_context_creator,
$daemon_settings->threads,
$target_php_settings,
$loop_settings,
$get_trace_settings
);

$dispatch_table = new DispatchTable(
$worker_pool,
$target_php_settings,
$loop_settings,
$get_trace_settings
);

$readers = [];
foreach ($pid_list as $target_pid) {
$reader_context = $worker_pool->getFreeWorker();
if (is_null($reader_context)) {
continue;
}
Promise\wait(
$reader_context->sendSettings(
[
$target_pid,
$target_php_settings,
$loop_settings,
$get_trace_settings
]
)
);
$readers[$target_pid] = $reader_context;
}
exec('stty -icanon -echo');

Loop::run(function () use (&$readers, $output) {
Loop::run(function () use ($dispatch_table, $searcher_context, $worker_pool, $output) {
Loop::onReadable(
STDIN,
/** @param resource $stream */
Expand All @@ -155,24 +150,29 @@ function (string $watcher_id, $stream) {
}
}
);
Loop::repeat(10, function () use (&$readers, $output) {
/** @var array<int, PhpReaderContext> $readers */

Loop::repeat(10, function () use ($dispatch_table, $searcher_context, $worker_pool, $output) {
$promises = [];
static $searcher_on_read = false;
if (!$searcher_on_read) {
$promises[] = \Amp\call(function () use ($searcher_context, $dispatch_table, &$searcher_on_read) {
$searcher_on_read = true;
$update_target_message = yield $searcher_context->receivePidList();
$dispatch_table->updateTargets($update_target_message->target_process_list);
$searcher_on_read = false;
});
}
$readers = $worker_pool->getReadableWorkers();
foreach ($readers as $pid => $reader) {
if (!$reader->isRunning()) {
/** @psalm-suppress MixedArrayAccess*/
unset($readers[$pid]);
continue;
}
$promises[] = \Amp\call(
function () use ($reader, &$readers, $pid, $output) {
/** @psalm-suppress MixedArrayAccess*/
unset($readers[$pid]);
/** @var string $result */
function () use ($reader, $pid, $worker_pool, $dispatch_table, $output) {
$worker_pool->setOnRead($pid);
$result = yield $reader->receiveTrace();
$output->write($result);
$readers[$pid] = $reader;
$worker_pool->releaseOnRead($pid);
if ($result instanceof TraceMessage) {
$output->write($result->trace);
} else {
$dispatch_table->releaseOne($result->pid);
}
}
);
}
Expand Down
18 changes: 10 additions & 8 deletions src/Inspector/Daemon/Dispatcher/DispatchTable.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,22 @@ public function updateTargets(TargetProcessList $update): void
}
$this->assigned->putOne($picked);
$this->dispatch_table[$picked] = $worker;
$worker->sendSettings([
$picked,
$this->target_php_settings,
$this->trace_loop_settings,
$this->get_trace_settings
]);
$worker->sendAttach($picked);
}
}

public function release(TargetProcessList $targets): void
{
foreach ($targets->getArray() as $pid) {
$this->dispatch_table[$pid]->sendQuit();
unset($this->dispatch_table[$pid]);
$this->releaseOne($pid);
}
}

public function releaseOne(int $pid): void
{
$worker = $this->dispatch_table[$pid];
$this->worker_pool->returnWorkerToPool($worker);
unset($this->dispatch_table[$pid]);
$this->assigned = $this->assigned->getDiff(new TargetProcessList($pid));
}
}
24 changes: 24 additions & 0 deletions src/Inspector/Daemon/Dispatcher/Message/DetachWorkerMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/**
* This file is part of the sj-i/php-profiler package.
*
* (c) sji <sji@sj-i.dev>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace PhpProfiler\Inspector\Daemon\Dispatcher\Message;

final class DetachWorkerMessage
{
public int $pid;

public function __construct(int $pid)
{
$this->pid = $pid;
}
}
24 changes: 24 additions & 0 deletions src/Inspector/Daemon/Dispatcher/Message/TraceMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

/**
* This file is part of the sj-i/php-profiler package.
*
* (c) sji <sji@sj-i.dev>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace PhpProfiler\Inspector\Daemon\Dispatcher\Message;

final class TraceMessage
{
public string $trace;

public function __construct(string $trace)
{
$this->trace = $trace;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

/**
* This file is part of the sj-i/php-profiler package.
*
* (c) sji <sji@sj-i.dev>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace PhpProfiler\Inspector\Daemon\Dispatcher\Message;

use PhpProfiler\Inspector\Daemon\Dispatcher\TargetProcessList;

final class UpdateTargetProcessMessage
{
public TargetProcessList $target_process_list;

public function __construct(TargetProcessList $target_process_list)
{
$this->target_process_list = $target_process_list;
}
}
48 changes: 46 additions & 2 deletions src/Inspector/Daemon/Dispatcher/WorkerPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
use PhpProfiler\Inspector\Daemon\Reader\Context\PhpReaderContext;
use PhpProfiler\Inspector\Daemon\Reader\Context\PhpReaderContextCreator;
use Amp\Promise;
use PhpProfiler\Inspector\Settings\GetTraceSettings;
use PhpProfiler\Inspector\Settings\TargetPhpSettings;
use PhpProfiler\Inspector\Settings\TraceLoopSettings;

final class WorkerPool
{
Expand All @@ -25,14 +28,23 @@ final class WorkerPool
/** @var array<int, bool> */
private array $is_free_list;

/** @var array<int, bool> */
private array $on_read_list;

public function __construct(PhpReaderContext ...$contexts)
{
$this->contexts = $contexts;
$this->is_free_list = array_fill(0, count($contexts), true);
$this->on_read_list = array_fill(0, count($contexts), false);
}

public static function create(PhpReaderContextCreator $creator, int $number): self
{
public static function create(
PhpReaderContextCreator $creator,
int $number,
TargetPhpSettings $target_php_settings,
TraceLoopSettings $loop_settings,
GetTraceSettings $get_trace_settings
): self {
$contexts = [];
$started = [];
for ($i = 0; $i < $number; $i++) {
Expand All @@ -41,6 +53,16 @@ public static function create(PhpReaderContextCreator $creator, int $number): se
$contexts[] = $context;
}
Promise\wait(Promise\all($started));
$send_settings = [];
for ($i = 0; $i < $number; $i++) {
$send_settings[] = $contexts[$i]->sendSettings(
$target_php_settings,
$loop_settings,
$get_trace_settings
);
}
Promise\wait(Promise\all($send_settings));

return new self(...$contexts);
}

Expand All @@ -55,6 +77,18 @@ public function getFreeWorker(): ?PhpReaderContext
return null;
}

/**
* @return iterable<int, PhpReaderContext>
*/
public function getReadableWorkers(): iterable
{
foreach ($this->contexts as $key => $context) {
if (!$this->is_free_list[$key] and !$this->on_read_list[$key]) {
yield $key => $context;
}
}
}

public function returnWorkerToPool(PhpReaderContext $context_to_return): void
{
foreach ($this->contexts as $key => $context) {
Expand All @@ -63,4 +97,14 @@ public function returnWorkerToPool(PhpReaderContext $context_to_return): void
}
}
}

public function setOnRead(int $pid): void
{
$this->on_read_list[$pid] = true;
}

public function releaseOnRead(int $pid): void
{
$this->on_read_list[$pid] = false;
}
}
44 changes: 30 additions & 14 deletions src/Inspector/Daemon/Reader/Context/PhpReaderContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

use Amp\Parallel\Context;
use Amp\Promise;
use PhpProfiler\Inspector\Settings\DaemonSettings;
use PhpProfiler\Inspector\Daemon\Dispatcher\Message\DetachWorkerMessage;
use PhpProfiler\Inspector\Daemon\Dispatcher\Message\TraceMessage;
use PhpProfiler\Inspector\Daemon\Reader\Message\AttachMessage;
use PhpProfiler\Inspector\Daemon\Reader\Message\SetSettingsMessage;
use PhpProfiler\Inspector\Settings\GetTraceSettings;
use PhpProfiler\Inspector\Settings\TargetPhpSettings;
use PhpProfiler\Inspector\Settings\TraceLoopSettings;
Expand All @@ -35,24 +38,36 @@ public function start(): Promise
}

/**
* @param array{
* 0: int,
* 1: TargetPhpSettings,
* 2: TraceLoopSettings,
* 3: GetTraceSettings
* } $array
* @param TargetPhpSettings $target_php_settings
* @param TraceLoopSettings $loop_settings
* @param GetTraceSettings $get_trace_settings
* @return Promise<int>
*/
public function sendSettings(array $array): Promise
{
public function sendSettings(
TargetPhpSettings $target_php_settings,
TraceLoopSettings $loop_settings,
GetTraceSettings $get_trace_settings
): Promise {
/** @var Promise<int> */
return $this->context->send($array);
return $this->context->send(
new SetSettingsMessage(
$target_php_settings,
$loop_settings,
$get_trace_settings
)
);
}

public function sendQuit(): Promise
/**
* @param int $pid
* @return Promise<int>
*/
public function sendAttach(int $pid): Promise
{
/** @var Promise<int> */
return $this->context->send(null);
return $this->context->send(
new AttachMessage($pid)
);
}

public function isRunning(): bool
Expand All @@ -61,11 +76,12 @@ public function isRunning(): bool
}

/**
/* @return Promise<string>
* @return Promise<TraceMessage|DetachWorkerMessage>
* @psalm-yield Promise<TraceMessage|DetachWorkerMessage>
*/
public function receiveTrace(): Promise
{
/** @psalm-yield Promise<string> */
/** @var Promise<TraceMessage|DetachWorkerMessage> */
return $this->context->receive();
}
}
Loading

0 comments on commit f21da20

Please sign in to comment.