Skip to content

Commit

Permalink
[DX] Merge WorkerRunner to WorkerCommand, to make context closer (#4972)
Browse files Browse the repository at this point in the history
  • Loading branch information
TomasVotruba committed Sep 10, 2023
1 parent e4eec89 commit 0c5ad10
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 127 deletions.
11 changes: 5 additions & 6 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
"webmozart/assert": "^1.11"
},
"require-dev": {
"cweagans/composer-patches": "^1.7.2",
"nategood/httpful": "^0.3.2",
"phpstan/extension-installer": "^1.3",
"phpstan/phpstan-deprecation-rules": "^1.1",
Expand All @@ -53,17 +52,17 @@
"phpstan/phpstan-webmozart-assert": "^1.2.2",
"phpunit/phpunit": "^10.1",
"rector/phpstan-rules": "^0.7.1",
"rector/rector-generator": "dev-main",
"rector/rector-generator": "^0.7",
"robiningelbrecht/phpunit-pretty-print": "^1.2.2",
"spatie/enum": "^3.13",
"symplify/easy-ci": "^11.1.18",
"symplify/easy-coding-standard": "^12.0.5",
"symplify/easy-ci": "^11.3",
"symplify/easy-coding-standard": "^12.0",
"symplify/phpstan-extensions": "^11.2",
"symplify/phpstan-rules": "^12.1.4",
"symplify/phpstan-rules": "^12.2",
"symplify/rule-doc-generator": "^12.0",
"symplify/vendor-patches": "^11.2",
"tomasvotruba/class-leak": "^0.1",
"tomasvotruba/cognitive-complexity": "^0.1",
"tomasvotruba/cognitive-complexity": "^0.2",
"tomasvotruba/lines": "^0.5",
"tomasvotruba/type-coverage": "^0.2",
"tomasvotruba/unused-public": "^0.3",
Expand Down
82 changes: 0 additions & 82 deletions packages/Parallel/WorkerRunner.php

This file was deleted.

71 changes: 36 additions & 35 deletions src/Application/ApplicationFileProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,27 @@ public function run(Configuration $configuration, InputInterface $input): Proces

$this->configureCustomErrorHandler();

/**
* Mimic @see https://github.com/phpstan/phpstan-src/blob/ab154e1da54d42fec751e17a1199b3e07591e85e/src/Command/AnalyseApplication.php#L188C23-L244
*/
if ($configuration->shouldShowProgressBar()) {
$fileCount = count($filePaths);
$this->symfonyStyle->progressStart($fileCount);
$this->symfonyStyle->progressAdvance(0);

$postFileCallback = function (int $stepCount): void {
$this->symfonyStyle->progressAdvance($stepCount);
// running in parallel here → nothing else to do
};
} else {
$postFileCallback = static function (int $stepCount): void {
};
}

if ($configuration->isParallel()) {
$processResult = $this->runParallel($filePaths, $configuration, $input);
$processResult = $this->runParallel($filePaths, $input, $postFileCallback);
} else {
$processResult = $this->processFiles($filePaths, $configuration, false);
$processResult = $this->processFiles($filePaths, $configuration, $postFileCallback);
}

$processResult->addSystemErrors($this->systemErrors);
Expand All @@ -79,18 +96,13 @@ public function run(Configuration $configuration, InputInterface $input): Proces

/**
* @param string[] $filePaths
* @param callable(int $fileCount): void|null $postFileCallback
*/
public function processFiles(array $filePaths, Configuration $configuration, bool $isParallel = true): ProcessResult
{
$shouldShowProgressBar = $configuration->shouldShowProgressBar();

// progress bar on parallel handled on runParallel()
if (! $isParallel && $shouldShowProgressBar) {
$fileCount = count($filePaths);
$this->symfonyStyle->progressStart($fileCount);
$this->symfonyStyle->progressAdvance(0);
}

public function processFiles(
array $filePaths,
Configuration $configuration,
?callable $postFileCallback = null
): ProcessResult {
/** @var SystemError[] $systemErrors */
$systemErrors = [];

Expand Down Expand Up @@ -118,10 +130,9 @@ public function processFiles(array $filePaths, Configuration $configuration, boo

$collectedData = array_merge($collectedData, $fileProcessResult->getCollectedData());

// progress bar +1,
// progress bar on parallel handled on runParallel()
if (! $isParallel && $shouldShowProgressBar) {
$this->symfonyStyle->progressAdvance();
if (is_callable($postFileCallback)) {
$postFileCallback(1);
}
} catch (Throwable $throwable) {
$this->changedFilesDetector->invalidateFile($filePath);
Expand Down Expand Up @@ -196,30 +207,20 @@ private function restoreErrorHandler(): void

/**
* @param string[] $filePaths
* @param callable(int $stepCount): void $postFileCallback
*/
private function runParallel(array $filePaths, Configuration $configuration, InputInterface $input): ProcessResult
{
private function runParallel(
array $filePaths,
InputInterface $input,
callable $postFileCallback
): ProcessResult {
$schedule = $this->scheduleFactory->create(
$this->cpuCoreCountProvider->provide(),
SimpleParameterProvider::provideIntParameter(Option::PARALLEL_JOB_SIZE),
SimpleParameterProvider::provideIntParameter(Option::PARALLEL_MAX_NUMBER_OF_PROCESSES),
$filePaths
);

if ($configuration->shouldShowProgressBar()) {
$fileCount = count($filePaths);
$this->symfonyStyle->progressStart($fileCount);
$this->symfonyStyle->progressAdvance(0);

$postFileCallback = function (int $stepCount): void {
$this->symfonyStyle->progressAdvance($stepCount);
// running in parallel here → nothing else to do
};
} else {
$postFileCallback = static function (int $stepCount): void {
};
}

$mainScript = $this->resolveCalledRectorBinary();
if ($mainScript === null) {
throw new ParallelShouldNotHappenException('[parallel] Main script was not found');
Expand All @@ -239,11 +240,11 @@ private function resolveCalledRectorBinary(): ?string
return null;
}

$potentialEcsBinaryPath = $_SERVER[self::ARGV][0];
if (! file_exists($potentialEcsBinaryPath)) {
$potentialRectorBinaryPath = $_SERVER[self::ARGV][0];
if (! file_exists($potentialRectorBinaryPath)) {
return null;
}

return $potentialEcsBinaryPath;
return $potentialRectorBinaryPath;
}
}
70 changes: 66 additions & 4 deletions src/Console/Command/WorkerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@
use React\EventLoop\StreamSelectLoop;
use React\Socket\ConnectionInterface;
use React\Socket\TcpConnector;
use Rector\Core\Application\ApplicationFileProcessor;
use Rector\Core\Configuration\ConfigurationFactory;
use Rector\Core\Console\ProcessConfigureDecorator;
use Rector\Core\StaticReflection\DynamicSourceLocatorDecorator;
use Rector\Core\Util\MemoryLimiter;
use Rector\Parallel\WorkerRunner;
use Rector\Core\ValueObject\Configuration;
use Rector\Core\ValueObject\Error\SystemError;
use Rector\Parallel\ValueObject\Bridge;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symplify\EasyParallel\Enum\Action;
use Symplify\EasyParallel\Enum\ReactCommand;
use Symplify\EasyParallel\Enum\ReactEvent;
use Throwable;

/**
* Inspired at: https://github.com/phpstan/phpstan-src/commit/9124c66dcc55a222e21b1717ba5f60771f7dda92
Expand All @@ -28,8 +34,14 @@
*/
final class WorkerCommand extends Command
{
/**
* @var string
*/
private const RESULT = 'result';

public function __construct(
private readonly WorkerRunner $workerRunner,
private readonly DynamicSourceLocatorDecorator $dynamicSourceLocatorDecorator,
private readonly ApplicationFileProcessor $applicationFileProcessor,
private readonly MemoryLimiter $memoryLimiter,
private readonly ConfigurationFactory $configurationFactory
) {
Expand Down Expand Up @@ -61,17 +73,67 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$inDecoder = new Decoder($connection, true, 512, JSON_INVALID_UTF8_IGNORE);
$outEncoder = new Encoder($connection, JSON_INVALID_UTF8_IGNORE);

// handshake?
$outEncoder->write([
ReactCommand::ACTION => Action::HELLO,
ReactCommand::IDENTIFIER => $parallelIdentifier,
]);

$this->workerRunner->run($outEncoder, $inDecoder, $configuration);
$this->runWorker($outEncoder, $inDecoder, $configuration);
});

$streamSelectLoop->run();

return self::SUCCESS;
}

private function runWorker(Encoder $encoder, Decoder $decoder, Configuration $configuration): void
{
$this->dynamicSourceLocatorDecorator->addPaths($configuration->getPaths());

// 1. handle system error
$handleErrorCallback = static function (Throwable $throwable) use ($encoder): void {
$systemError = new SystemError($throwable->getMessage(), $throwable->getFile(), $throwable->getLine());

$encoder->write([
ReactCommand::ACTION => Action::RESULT,
self::RESULT => [
Bridge::SYSTEM_ERRORS => [$systemError],
Bridge::FILES_COUNT => 0,
Bridge::SYSTEM_ERRORS_COUNT => 1,
],
]);
$encoder->end();
};

$encoder->on(ReactEvent::ERROR, $handleErrorCallback);

// 2. collect diffs + errors from file processor
$decoder->on(ReactEvent::DATA, function (array $json) use ($encoder, $configuration): void {
$action = $json[ReactCommand::ACTION];
if ($action !== Action::MAIN) {
return;
}

/** @var string[] $filePaths */
$filePaths = $json[Bridge::FILES] ?? [];

$processResult = $this->applicationFileProcessor->processFiles($filePaths, $configuration);

/**
* this invokes all listeners listening $decoder->on(...) @see \Symplify\EasyParallel\Enum\ReactEvent::DATA
*/
$encoder->write([
ReactCommand::ACTION => Action::RESULT,
self::RESULT => [
Bridge::FILE_DIFFS => $processResult->getFileDiffs(),
Bridge::FILES_COUNT => count($filePaths),
Bridge::SYSTEM_ERRORS => $processResult->getSystemErrors(),
Bridge::SYSTEM_ERRORS_COUNT => count($processResult->getSystemErrors()),
Bridge::COLLECTED_DATA => $processResult->getCollectedData(),
],
]);
});

$decoder->on(ReactEvent::ERROR, $handleErrorCallback);
}
}

0 comments on commit 0c5ad10

Please sign in to comment.