Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented subprocesses for indexing huge indices #79

Merged
merged 1 commit into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/Command/BaseCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,16 @@

abstract class BaseCommand extends AbstractCommand
{
protected const COMMAND_NAMESPACE = 'valantic:elastica-bridge:';
protected function displayThrowable(\Throwable $throwable): void
{
$this->output->writeln('');
$this->output->writeln(sprintf('In %s line %d', $throwable->getFile(), $throwable->getLine()));
$this->output->writeln('');

$this->output->writeln($throwable->getMessage());
$this->output->writeln('');

$this->output->writeln($throwable->getTraceAsString());
$this->output->writeln('');
}
}
3 changes: 2 additions & 1 deletion src/Command/Cleanup.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Question\ConfirmationQuestion;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;

Expand All @@ -27,7 +28,7 @@ public function __construct(

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'cleanup')
$this->setName(CommandConstants::COMMAND_CLEANUP)
->setDescription('Deletes Elasticsearch indices and aliases known to (i.e. created by) the bundle')
->addOption(
self::OPTION_ALL_IN_CLUSTER,
Expand Down
166 changes: 166 additions & 0 deletions src/Command/DoPopulateIndex.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
<?php

declare(strict_types=1);

namespace Valantic\ElasticaBridgeBundle\Command;

use Elastica\Index as ElasticaIndex;
use Pimcore\Model\Element\AbstractElement;
use Symfony\Component\Console\Helper\ProgressBar;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Exception\Command\DocumentFailedException;
use Valantic\ElasticaBridgeBundle\Index\IndexInterface;
use Valantic\ElasticaBridgeBundle\Repository\ConfigurationRepository;
use Valantic\ElasticaBridgeBundle\Repository\DocumentRepository;
use Valantic\ElasticaBridgeBundle\Repository\IndexRepository;
use Valantic\ElasticaBridgeBundle\Service\DocumentHelper;

class DoPopulateIndex extends BaseCommand
{
public function __construct(
private readonly IndexRepository $indexRepository,
private readonly DocumentRepository $documentRepository,
private readonly DocumentHelper $documentHelper,
private readonly ConfigurationRepository $configurationRepository,
) {
parent::__construct();
}

protected function configure(): void
{
$this->setName(CommandConstants::COMMAND_DO_POPULATE_INDEX)
->setHidden(true)
->setDescription('[INTERNAL]')
->addOption(CommandConstants::OPTION_CONFIG, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_INDEX, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_BATCH_NUMBER, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_LISTING_COUNT, mode: InputOption::VALUE_REQUIRED)
->addOption(CommandConstants::OPTION_DOCUMENT, mode: InputOption::VALUE_REQUIRED)
;
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$indexConfig = $this->getIndex();

if (!$indexConfig instanceof IndexInterface) {
return self::FAILURE;
}

return $this->populateIndex($indexConfig, $indexConfig->getBlueGreenInactiveElasticaIndex());
}

private function getIndex(): ?IndexInterface
{
foreach ($this->indexRepository->flattenedAll() as $indexConfig) {
if ($indexConfig->getName() === $this->input->getOption(CommandConstants::OPTION_CONFIG)) {
return $indexConfig;
}
}

return null;
}

private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esIndex): int
{
ProgressBar::setFormatDefinition('custom', "%percent%%\t%remaining%\t%memory%\n%message%");

$batchNumber = (int) $this->input->getOption(CommandConstants::OPTION_BATCH_NUMBER);
$listingCount = (int) $this->input->getOption(CommandConstants::OPTION_LISTING_COUNT);

$allowedDocuments = $indexConfig->getAllowedDocuments();
$document = $this->input->getOption(CommandConstants::OPTION_DOCUMENT);

if (!in_array($document, $allowedDocuments, true)) {
return self::FAILURE;
}

$progressBar = new ProgressBar($this->output, $listingCount > 0 ? $listingCount : 1);
$progressBar->setMessage($document);
$progressBar->setFormat('custom');
$progressBar->setProgress($batchNumber * $indexConfig->getBatchSize());

if (!$indexConfig->shouldPopulateInSubprocesses()) {
$numberOfBatches = ceil($listingCount / $indexConfig->getBatchSize());

for ($batch = 0; $batch < $numberOfBatches; $batch++) {
$exitCode = $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batch);

if ($exitCode !== self::SUCCESS) {
return $exitCode;
}
limenet marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
return $this->doPopulateIndex($esIndex, $indexConfig, $progressBar, $document, $batchNumber);
}

return self::SUCCESS;
}

private function doPopulateIndex(
ElasticaIndex $esIndex,
IndexInterface $indexConfig,
ProgressBar $progressBar,
string $document,
int $batchNumber,
): int {
$documentInstance = $this->documentRepository->get($document);
limenet marked this conversation as resolved.
Show resolved Hide resolved

$this->documentHelper->setTenantIfNeeded($documentInstance, $indexConfig);

$batchSize = $indexConfig->getBatchSize();

$listing = $documentInstance->getListingInstance($indexConfig);
$listing->setOffset($batchNumber * $batchSize);
$listing->setLimit($batchSize);

$esDocuments = [];

foreach ($listing->getData() ?? [] as $dataObject) {
try {
if (!$documentInstance->shouldIndex($dataObject)) {
continue;
}
$progressBar->advance();

$esDocuments[] = $this->documentHelper->elementToDocument($documentInstance, $dataObject);
} catch (\Throwable $throwable) {
$this->displayDocumentError($indexConfig, $document, $dataObject, $throwable);

if (!$this->configurationRepository->shouldSkipFailingDocuments()) {
throw new DocumentFailedException($throwable);
}
}
}

if (count($esDocuments) > 0) {
$esIndex->addDocuments($esDocuments);
$esDocuments = [];
}

if ($indexConfig->refreshIndexAfterEveryDocumentWhenPopulating()) {
$esIndex->refresh();
}

return self::SUCCESS;
}

private function displayDocumentError(
IndexInterface $indexConfig,
string $document,
AbstractElement $dataObject,
\Throwable $throwable,
): void {
$this->output->writeln('');
$this->output->writeln(sprintf(
'<fg=red;options=bold>Error while populating index %s, processing documents of type %s, last processed element ID %s.</>',
$indexConfig::class,
$document,
$dataObject->getId()
));
$this->displayThrowable($throwable);
}
}
9 changes: 5 additions & 4 deletions src/Command/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Process;
use Valantic\ElasticaBridgeBundle\Constant\CommandConstants;
use Valantic\ElasticaBridgeBundle\Elastica\Client\ElasticsearchClient;
use Valantic\ElasticaBridgeBundle\Enum\IndexBlueGreenSuffix;
use Valantic\ElasticaBridgeBundle\Exception\Index\BlueGreenIndicesIncorrectlySetupException;
Expand Down Expand Up @@ -39,7 +40,7 @@ public function __construct(

protected function configure(): void
{
$this->setName(self::COMMAND_NAMESPACE . 'index')
$this->setName(CommandConstants::COMMAND_INDEX)
->setDescription('Ensures all the indices are present and populated.')
->addArgument(
self::ARGUMENT_INDEX,
Expand Down Expand Up @@ -169,9 +170,9 @@ private function populateIndex(IndexInterface $indexConfig, ElasticaIndex $esInd
self::$isPopulating = true;
$process = new Process(
[
'bin/console', self::COMMAND_NAMESPACE . 'populate-index',
'--config', $indexConfig->getName(),
'--index', $esIndex->getName(),
'bin/console', CommandConstants::COMMAND_POPULATE_INDEX,
'--' . CommandConstants::OPTION_CONFIG, $indexConfig->getName(),
'--' . CommandConstants::OPTION_INDEX, $esIndex->getName(),
...array_filter([$this->output->isVerbose() ? '-v' : null,
$this->output->isVeryVerbose() ? '-vv' : null,
$this->output->isDebug() ? '-vvv' : null,
Expand Down
Loading
Loading