diff --git a/.composer.json b/.composer.json index 6f7f5947917..0cd8cff5364 100644 --- a/.composer.json +++ b/.composer.json @@ -42,7 +42,6 @@ "@test:behat-cli -c Neos.ContentRepository.LegacyNodeMigration/Tests/Behavior/behat.yml.dist", "@test:behat-cli -c Neos.ContentRepository.Export/Tests/Behavior/behat.yml.dist", "@test:behat-cli -c Neos.TimeableNodeVisibility/Tests/Behavior/behat.yml.dist", - "../../flow doctrine:migrate --quiet; ../../flow cr:setup", "@test:behat-cli -c Neos.Neos/Tests/Behavior/behat.yml" ], "test:behavioral:stop-on-failure": [ @@ -51,7 +50,6 @@ "@test:behat-cli -vvv --stop-on-failure -c Neos.ContentRepository.LegacyNodeMigration/Tests/Behavior/behat.yml.dist", "@test:behat-cli -vvv --stop-on-failure -c Neos.ContentRepository.Export/Tests/Behavior/behat.yml.dist", "@test:behat-cli -vvv --stop-on-failure -c Neos.TimeableNodeVisibility/Tests/Behavior/behat.yml.dist", - "../../flow doctrine:migrate --quiet; ../../flow cr:setup", "@test:behat-cli -vvv --stop-on-failure -c Neos.Neos/Tests/Behavior/behat.yml" ], "test": [ diff --git a/Neos.ContentGraph.DoctrineDbalAdapter/composer.json b/Neos.ContentGraph.DoctrineDbalAdapter/composer.json index 6ef1a4dfca2..f7d1c899c28 100644 --- a/Neos.ContentGraph.DoctrineDbalAdapter/composer.json +++ b/Neos.ContentGraph.DoctrineDbalAdapter/composer.json @@ -10,8 +10,7 @@ "license": "GPL-3.0+", "require": { "neos/contentrepository-core": "self.version", - "doctrine/dbal": "^2.13", - "doctrine/migrations": "*" + "neos/contentrepository-dbaltools": "*" }, "autoload": { "psr-4": { diff --git a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php index 1476c4fd42a..91d7debfae2 100644 --- a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php +++ b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphProjection.php @@ -6,7 +6,6 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Schema\AbstractSchemaManager; -use Doctrine\DBAL\Types\Types; use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeMove; use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeRemoval; use Neos\ContentGraph\DoctrineDbalAdapter\Domain\Projection\Feature\NodeVariation; @@ -46,7 +45,6 @@ use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; use Neos\ContentRepository\Core\NodeType\NodeTypeName; -use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType; use Neos\ContentRepository\Core\Projection\ContentGraph\NodeTags; use Neos\ContentRepository\Core\Projection\ContentGraph\Timestamps; use Neos\ContentRepository\Core\Projection\ProjectionInterface; @@ -56,6 +54,8 @@ use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Node\NodeName; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -73,8 +73,6 @@ final class DoctrineDbalContentGraphProjection implements ProjectionInterface, W public const RELATION_DEFAULT_OFFSET = 128; - private DbalCheckpointStorage $checkpointStorage; - public function __construct( private readonly Connection $dbal, private readonly ProjectionContentGraph $projectionContentGraph, @@ -82,11 +80,6 @@ public function __construct( private readonly DimensionSpacePointsRepository $dimensionSpacePointsRepository, private readonly ContentGraphFinder $contentGraphFinder ) { - $this->checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableNames->checkpoint(), - self::class - ); } protected function getProjectionContentGraph(): ProjectionContentGraph @@ -97,9 +90,8 @@ protected function getProjectionContentGraph(): ProjectionContentGraph public function setUp(): void { foreach ($this->determineRequiredSqlStatements() as $statement) { - $this->getDatabaseConnection()->executeStatement($statement); + $this->dbal->executeStatement($statement); } - $this->checkpointStorage->setUp(); } /** @@ -117,15 +109,8 @@ private function determineRequiredSqlStatements(): array public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { - $this->getDatabaseConnection()->connect(); + $this->dbal->connect(); } catch (\Throwable $e) { return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage())); } @@ -137,57 +122,33 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } public function reset(): void - { - $this->truncateDatabaseTables(); - - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); - $this->getState()->forgetInstances(); - } - - public function markStale(): void - { - $this->getState()->forgetInstances(); - } - - private function truncateDatabaseTables(): void { $this->dbal->executeQuery('TRUNCATE table ' . $this->tableNames->node()); $this->dbal->executeQuery('TRUNCATE table ' . $this->tableNames->hierarchyRelation()); $this->dbal->executeQuery('TRUNCATE table ' . $this->tableNames->referenceRelation()); $this->dbal->executeQuery('TRUNCATE table ' . $this->tableNames->dimensionSpacePoints()); + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableNames->checkpoint()); + $this->getState()->forgetInstances(); } - public function canHandle(EventInterface $event): bool + public function markStale(): void { - return in_array($event::class, [ - RootNodeAggregateWithNodeWasCreated::class, - RootNodeAggregateDimensionsWereUpdated::class, - NodeAggregateWithNodeWasCreated::class, - NodeAggregateNameWasChanged::class, - ContentStreamWasForked::class, - ContentStreamWasRemoved::class, - NodePropertiesWereSet::class, - NodeReferencesWereSet::class, - NodeAggregateTypeWasChanged::class, - DimensionSpacePointWasMoved::class, - DimensionShineThroughWasAdded::class, - NodeAggregateWasRemoved::class, - NodeAggregateWasMoved::class, - NodeSpecializationVariantWasCreated::class, - NodeGeneralizationVariantWasCreated::class, - NodePeerVariantWasCreated::class, - SubtreeWasTagged::class, - SubtreeWasUntagged::class, - ]); + $this->getState()->forgetInstances(); } + public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { RootNodeAggregateWithNodeWasCreated::class => $this->whenRootNodeAggregateWithNodeWasCreated($event, $eventEnvelope), RootNodeAggregateDimensionsWereUpdated::class => $this->whenRootNodeAggregateDimensionsWereUpdated($event), @@ -207,13 +168,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event, $eventEnvelope), SubtreeWasTagged::class => $this->whenSubtreeWasTagged($event), SubtreeWasUntagged::class => $this->whenSubtreeWasUntagged($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableNames->checkpoint(), $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableNames->checkpoint()); } public function getState(): ContentGraphFinder @@ -272,13 +235,13 @@ private function whenRootNodeAggregateDimensionsWereUpdated(RootNodeAggregateDim } // delete all hierarchy edges of the root node - $this->getDatabaseConnection()->executeUpdate(' - DELETE FROM ' . $this->tableNames->hierarchyRelation() . ' - WHERE - parentnodeanchor = :parentNodeAnchor - AND childnodeanchor = :childNodeAnchor - AND contentstreamid = :contentStreamId - ', [ + $this->dbal->executeUpdate(' + DELETE FROM ' . $this->tableNames->hierarchyRelation() . ' + WHERE + parentnodeanchor = :parentNodeAnchor + AND childnodeanchor = :childNodeAnchor + AND contentstreamid = :contentStreamId + ', [ 'parentNodeAnchor' => NodeRelationAnchorPoint::forRootEdge()->value, 'childNodeAnchor' => $rootNodeAnchorPoint->value, 'contentStreamId' => $event->contentStreamId->value, @@ -353,7 +316,7 @@ private function createNodeWithHierarchy( EventEnvelope $eventEnvelope, ): void { $node = NodeRecord::createNewInDatabase( - $this->getDatabaseConnection(), + $this->dbal, $this->tableNames, $nodeAggregateId, $originDimensionSpacePoint->jsonSerialize(), @@ -442,7 +405,7 @@ private function connectHierarchy( $inheritedSubtreeTags, ); - $hierarchyRelation->addToDatabase($this->getDatabaseConnection(), $this->tableNames); + $hierarchyRelation->addToDatabase($this->dbal, $this->tableNames); } } @@ -529,7 +492,7 @@ private function getRelationPositionAfterRecalculation( $position = $offset; $offset += self::RELATION_DEFAULT_OFFSET; } - $relation->assignNewPosition($offset, $this->getDatabaseConnection(), $this->tableNames); + $relation->assignNewPosition($offset, $this->dbal, $this->tableNames); } return $position; @@ -543,10 +506,11 @@ private function whenContentStreamWasForked(ContentStreamWasForked $event): void // // 1) Copy HIERARCHY RELATIONS (this is the MAIN OPERATION here) // - $this->getDatabaseConnection()->executeUpdate(' + $this->dbal->executeUpdate(' INSERT INTO ' . $this->tableNames->hierarchyRelation() . ' ( parentnodeanchor, childnodeanchor, + position, dimensionspacepointhash, subtreetags, @@ -555,6 +519,7 @@ private function whenContentStreamWasForked(ContentStreamWasForked $event): void SELECT h.parentnodeanchor, h.childnodeanchor, + h.position, h.dimensionspacepointhash, h.subtreetags, @@ -565,7 +530,6 @@ private function whenContentStreamWasForked(ContentStreamWasForked $event): void ', [ 'sourceContentStreamId' => $event->sourceContentStreamId->value ]); - // NOTE: as reference edges are attached to Relation Anchor Points (and they are lazily copy-on-written), // we do not need to copy reference edges here (but we need to do it during copy on write). } diff --git a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphSchemaBuilder.php b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphSchemaBuilder.php index e6bc7fc028e..02dab89a18f 100644 --- a/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphSchemaBuilder.php +++ b/Neos.ContentGraph.DoctrineDbalAdapter/src/DoctrineDbalContentGraphSchemaBuilder.php @@ -8,7 +8,8 @@ use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaFactory; /** * @internal @@ -28,7 +29,8 @@ public function buildSchema(AbstractSchemaManager $schemaManager): Schema $this->createNodeTable(), $this->createHierarchyRelationTable(), $this->createReferenceRelationTable(), - $this->createDimensionSpacePointsTable() + $this->createDimensionSpacePointsTable(), + CheckpointHelper::checkpointTableSchema($this->contentGraphTableNames->checkpoint()), ]); } diff --git a/Neos.ContentGraph.DoctrineDbalAdapter/src/Domain/Repository/ProjectionContentGraph.php b/Neos.ContentGraph.DoctrineDbalAdapter/src/Domain/Repository/ProjectionContentGraph.php index 22e858a201c..4e53ced9c73 100644 --- a/Neos.ContentGraph.DoctrineDbalAdapter/src/Domain/Repository/ProjectionContentGraph.php +++ b/Neos.ContentGraph.DoctrineDbalAdapter/src/Domain/Repository/ProjectionContentGraph.php @@ -26,7 +26,6 @@ use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; -use Neos\ContentRepository\Core\SharedModel\Node\NodeName; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; /** diff --git a/Neos.ContentGraph.PostgreSQLAdapter/composer.json b/Neos.ContentGraph.PostgreSQLAdapter/composer.json index d76c1831bd8..a8976fdc257 100644 --- a/Neos.ContentGraph.PostgreSQLAdapter/composer.json +++ b/Neos.ContentGraph.PostgreSQLAdapter/composer.json @@ -11,7 +11,7 @@ "require": { "neos/contentrepository-core": "self.version", "doctrine/dbal": "*", - "doctrine/migrations": "*" + "neos/contentrepository-dbaltools": "*" }, "autoload": { "psr-4": { diff --git a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php index 2de68648353..f8bd9ee49f8 100644 --- a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php +++ b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/HypergraphProjection.php @@ -41,11 +41,10 @@ use Neos\ContentRepository\Core\Feature\RootNodeCreation\Event\RootNodeAggregateWithNodeWasCreated; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged; -use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; -use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStatus; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -67,7 +66,6 @@ final class HypergraphProjection implements ProjectionInterface use NodeTypeChange; use NodeVariation; - private DbalCheckpointStorage $checkpointStorage; private ProjectionHypergraph $projectionHypergraph; public function __construct( @@ -76,11 +74,6 @@ public function __construct( private readonly ContentGraphFinder $contentGraphFinder ) { $this->projectionHypergraph = new ProjectionHypergraph($this->dbal, $this->tableNamePrefix); - $this->checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableNamePrefix . '_checkpoint', - self::class - ); } @@ -98,20 +91,12 @@ public function setUp(): void create index if not exists restriction_affected on ' . $this->tableNamePrefix . '_restrictionhyperrelation using gin (affectednodeaggregateids); '); - $this->checkpointStorage->setUp(); } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { - $this->getDatabaseConnection()->connect(); + $this->dbal->connect(); } catch (\Throwable $e) { return ProjectionStatus::error(sprintf('Failed to connect to database: %s', $e->getMessage())); } @@ -123,6 +108,11 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } @@ -145,8 +135,7 @@ public function reset(): void { $this->truncateDatabaseTables(); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); } private function truncateDatabaseTables(): void @@ -157,41 +146,9 @@ private function truncateDatabaseTables(): void $this->dbal->executeQuery('TRUNCATE table ' . $this->tableNamePrefix . '_restrictionhyperrelation'); } - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - // ContentStreamForking - ContentStreamWasForked::class, - // NodeCreation - RootNodeAggregateWithNodeWasCreated::class, - NodeAggregateWithNodeWasCreated::class, - // SubtreeTagging - SubtreeWasTagged::class, - SubtreeWasUntagged::class, - // NodeModification - NodePropertiesWereSet::class, - // NodeReferencing - NodeReferencesWereSet::class, - // NodeRemoval - NodeAggregateWasRemoved::class, - // NodeRenaming - NodeAggregateNameWasChanged::class, - // NodeTypeChange - NodeAggregateTypeWasChanged::class, - // NodeVariation - NodeSpecializationVariantWasCreated::class, - NodeGeneralizationVariantWasCreated::class, - NodePeerVariantWasCreated::class, - // TODO: not yet supported: - //ContentStreamWasRemoved::class, - //DimensionSpacePointWasMoved::class, - //DimensionShineThroughWasAdded::class, - //NodeAggregateWasMoved::class, - ]); - } - public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { // ContentStreamForking ContentStreamWasForked::class => $this->whenContentStreamWasForked($event), @@ -215,13 +172,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void NodeSpecializationVariantWasCreated::class => $this->whenNodeSpecializationVariantWasCreated($event), NodeGeneralizationVariantWasCreated::class => $this->whenNodeGeneralizationVariantWasCreated($event), NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); } public function getState(): ContentGraphFinder diff --git a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/SchemaBuilder/HypergraphSchemaBuilder.php b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/SchemaBuilder/HypergraphSchemaBuilder.php index 40176222de6..99fc31fde76 100644 --- a/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/SchemaBuilder/HypergraphSchemaBuilder.php +++ b/Neos.ContentGraph.PostgreSQLAdapter/src/Domain/Projection/SchemaBuilder/HypergraphSchemaBuilder.php @@ -7,6 +7,7 @@ use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; use Neos\ContentGraph\PostgreSQLAdapter\Domain\Projection\HypergraphProjection; +use Neos\ContentRepository\DbalTools\CheckpointHelper; /** * @internal @@ -20,7 +21,7 @@ public function __construct( public function buildSchema(): Schema { - $schema = new Schema(); + $schema = new Schema([CheckpointHelper::checkpointTableSchema($this->tableNamePrefix)]); $this->createNodeTable($schema); $this->createHierarchyHyperrelationTable($schema); diff --git a/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementService.php b/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementService.php index 21c344d2f9d..5e8caf7fb2e 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementService.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementService.php @@ -52,7 +52,7 @@ class PerformanceMeasurementService implements ContentRepositoryServiceInterface public function __construct( private readonly EventPersister $eventPersister, private readonly ContentRepository $contentRepository, - private readonly Connection $connection, + private readonly Connection $dbal, private readonly ContentRepositoryId $contentRepositoryId ) { $this->contentStreamId = contentStreamId::fromString('cs-identifier'); @@ -73,7 +73,7 @@ public function __construct( public function removeEverything(): void { $eventTableName = DoctrineEventStoreFactory::databaseTableName($this->contentRepositoryId); - $this->connection->executeStatement('TRUNCATE ' . $this->connection->quoteIdentifier($eventTableName)); + $this->dbal->executeStatement('TRUNCATE ' . $this->dbal->quoteIdentifier($eventTableName)); $this->contentRepository->resetProjectionStates(); } @@ -96,7 +96,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels): NodeAggregateClassification::CLASSIFICATION_ROOT, ); - $this->eventPersister->publishEvents(new EventsToPublish( + $this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish( $this->contentStreamEventStream->getEventStreamName(), Events::with($rootNodeAggregateWasCreated), ExpectedVersion::ANY() @@ -106,7 +106,7 @@ public function createNodesForPerformanceTest(int $nodesPerLevel, int $levels): $sumSoFar = 0; $events = []; $this->createHierarchy($rootNodeAggregateId, 1, $levels, $nodesPerLevel, $sumSoFar, $events); - $this->eventPersister->publishEvents(new EventsToPublish( + $this->eventPersister->publishEvents($this->contentRepository, new EventsToPublish( $this->contentStreamEventStream->getEventStreamName(), Events::fromArray($events), ExpectedVersion::ANY() diff --git a/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementServiceFactory.php b/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementServiceFactory.php index 1b3c9611228..a4a9e11a16a 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementServiceFactory.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/Command/PerformanceMeasurementServiceFactory.php @@ -25,7 +25,7 @@ class PerformanceMeasurementServiceFactory implements ContentRepositoryServiceFactoryInterface { public function __construct( - private readonly Connection $connection, + private readonly Connection $dbal, ) { } @@ -35,7 +35,7 @@ public function build( return new PerformanceMeasurementService( $serviceFactoryDependencies->eventPersister, $serviceFactoryDependencies->contentRepository, - $this->connection, + $this->dbal, $serviceFactoryDependencies->contentRepositoryId ); } diff --git a/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php b/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php index 3309f99d17d..97ce6721535 100644 --- a/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php +++ b/Neos.ContentRepository.BehavioralTests/Classes/ProjectionRaceConditionTester/RaceTrackerCatchUpHook.php @@ -18,14 +18,13 @@ use Neos\ContentRepository\BehavioralTests\ProjectionRaceConditionTester\Dto\TraceEntryType; use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\Projection\CatchUpHookInterface; -use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTrigger; use Neos\EventStore\Model\EventEnvelope; use Neos\Flow\Annotations as Flow; /** * We had some race conditions in projections, where {@see \Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage} was not working properly. * We saw some non-deterministic, random errors when running the tests - unluckily only on Linux, not on OSX: - * On OSX, forking a new subprocess in {@see SubprocessProjectionCatchUpTrigger} is *WAY* slower than in Linux; + * On OSX, forking a new subprocess is *WAY* slower than in Linux; * and thus the race conditions which appears if two projector instances of the same class run concurrently * won't happen (or are way less likely). * diff --git a/Neos.ContentRepository.BehavioralTests/Tests/Functional/Feature/WorkspacePublication/setup-is-running-flag b/Neos.ContentRepository.BehavioralTests/Tests/Functional/Feature/WorkspacePublication/setup-is-running-flag new file mode 100644 index 00000000000..e69de29bb2d diff --git a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandResult.php b/Neos.ContentRepository.Core/Classes/CommandHandler/CommandResult.php deleted file mode 100644 index 515309a4974..00000000000 --- a/Neos.ContentRepository.Core/Classes/CommandHandler/CommandResult.php +++ /dev/null @@ -1,27 +0,0 @@ -> $projections - * @param array $sequenceNumberPerProjection - */ - public function __construct( - public Projections $projections, - private array $sequenceNumberPerProjection, - ) { - } - - public static function empty(): self - { - return new self(Projections::empty(), []); - } - - public static function fromProjectionsAndEventsAndSequenceNumber( - Projections $allProjections, - Events $events, - SequenceNumber $highestCommittedSequenceNumber - ): self { - $sequenceNumberInteger = $highestCommittedSequenceNumber->value - $events->count() + 1; - $pendingProjectionsArray = []; - $sequenceNumberPerProjection = []; - foreach ($events as $event) { - if ($event instanceof DecoratedEvent) { - $event = $event->innerEvent; - } - foreach ($allProjections as $projection) { - if ($projection->canHandle($event)) { - $sequenceNumberPerProjection[$projection::class] = $sequenceNumberInteger; - if (!in_array($projection, $pendingProjectionsArray, true)) { - $pendingProjectionsArray[] = $projection; - } - } - } - $sequenceNumberInteger++; - } - return new self(Projections::fromArray($pendingProjectionsArray), $sequenceNumberPerProjection); - } - - /** - * @param ProjectionInterface $projection - * @return SequenceNumber - */ - public function getExpectedSequenceNumber(ProjectionInterface $projection): SequenceNumber - { - if (!array_key_exists($projection::class, $this->sequenceNumberPerProjection)) { - throw new \InvalidArgumentException( - sprintf('Projection of class "%s" is not pending', $projection::class), - 1652252976 - ); - } - return SequenceNumber::fromInteger($this->sequenceNumberPerProjection[$projection::class]); - } -} diff --git a/Neos.ContentRepository.Core/Classes/CommandHandlingDependencies.php b/Neos.ContentRepository.Core/Classes/CommandHandlingDependencies.php index c0b5270cc37..b138e9fd052 100644 --- a/Neos.ContentRepository.Core/Classes/CommandHandlingDependencies.php +++ b/Neos.ContentRepository.Core/Classes/CommandHandlingDependencies.php @@ -41,9 +41,9 @@ public function __construct(private readonly ContentRepository $contentRepositor { } - public function handle(CommandInterface $command): CommandResult + public function handle(CommandInterface $command): void { - return $this->contentRepository->handle($command); + $this->contentRepository->handle($command); } public function getWorkspaceFinder(): WorkspaceFinder diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index f6fc32cbc0e..40773e4629d 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -16,7 +16,6 @@ use Neos\ContentRepository\Core\CommandHandler\CommandBus; use Neos\ContentRepository\Core\CommandHandler\CommandInterface; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\Dimension\ContentDimensionSourceInterface; use Neos\ContentRepository\Core\DimensionSpace\InterDimensionalVariationGraph; use Neos\ContentRepository\Core\EventStore\DecoratedEvent; @@ -27,7 +26,7 @@ use Neos\ContentRepository\Core\EventStore\EventsToPublish; use Neos\ContentRepository\Core\Factory\ContentRepositoryFactory; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; -use Neos\ContentRepository\Core\Projection\CatchUp; +use Neos\ContentRepository\Core\Projection\CatchUpHookInterface; use Neos\ContentRepository\Core\Projection\CatchUpOptions; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentGraphInterface; use Neos\ContentRepository\Core\Projection\ContentStream\ContentStreamFinder; @@ -43,9 +42,12 @@ use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; use Neos\EventStore\EventStoreInterface; use Neos\EventStore\Model\Event\EventMetadata; +use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; use Neos\EventStore\Model\EventStream\VirtualStreamName; use Psr\Clock\ClockInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\SemaphoreStore; /** * Main Entry Point to the system. Encapsulates the full event-sourced Content Repository. @@ -89,16 +91,12 @@ public function __construct( /** * The only API to send commands (mutation intentions) to the system. - * - * The system is ASYNCHRONOUS by default, so that means the projection is not directly up to date. If you - * need to be synchronous, call {@see CommandResult::block()} on the returned CommandResult - then the system - * waits until the projections are up to date. - * - * @param CommandInterface $command - * @return CommandResult + * @return object NOTE: This is just a b/c layer to avoid `handle()->block()` from failing but this will change to void with the final release! */ - public function handle(CommandInterface $command): CommandResult + public function handle(CommandInterface $command): object { + #print_r(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS));exit; + #\Neos\Flow\var_dump('HANDLE ' . $command::class); // the commands only calculate which events they want to have published, but do not do the // publishing themselves $eventsToPublish = $this->commandBus->handle($command, $this->commandHandlingDependencies); @@ -132,7 +130,15 @@ public function handle(CommandInterface $command): CommandResult $eventsToPublish->expectedVersion, ); - return $this->eventPersister->publishEvents($eventsToPublish); + $this->eventPersister->publishEvents($this, $eventsToPublish); + return new class { + /** + * @deprecated backwards compatibility layer + */ + public function block(): void + { + } + }; } @@ -158,6 +164,67 @@ public function projectionState(string $projectionStateClassName): ProjectionSta throw new \InvalidArgumentException(sprintf('A projection state of type "%s" is not registered in this content repository instance.', $projectionStateClassName), 1662033650); } + public function catchUpProjections(): void + { + $store = new SemaphoreStore(); + $factory = new LockFactory($store); + $lock = $factory->createLock('catchup'); + $lock->acquire(true); + #print_r(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS)); + $lowestAppliedSequenceNumber = null; + + /** @var array, sequenceNumber: SequenceNumber, catchUpHook: CatchUpHookInterface|null}> $projectionsAndCatchUpHooks */ + $projectionsAndCatchUpHooks = []; + foreach ($this->projectionsAndCatchUpHooks->projections as $projectionClassName => $projection) { + $projectionSequenceNumber = $projection->getCheckpoint(); + #\Neos\Flow\var_dump('ACQUIRE LOCK FOR ' . $projectionClassName . ': ' . $projectionSequenceNumber->value); + if ($lowestAppliedSequenceNumber === null || $projectionSequenceNumber->value < $lowestAppliedSequenceNumber->value) { + $lowestAppliedSequenceNumber = $projectionSequenceNumber; + } + $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); + $projectionsAndCatchUpHooks[$projectionClassName] = [ + 'projection' => $projection, + 'sequenceNumber' => $projectionSequenceNumber, + 'catchUpHook' => $catchUpHookFactory?->build($this), + ]; + $projectionsAndCatchUpHooks[$projectionClassName]['catchUpHook']?->onBeforeCatchUp(); + } + #\Neos\Flow\var_dump('CATCHUP from ' . $lowestAppliedSequenceNumber->value); + assert($lowestAppliedSequenceNumber instanceof SequenceNumber); + $eventStream = $this->eventStore->load(VirtualStreamName::all())->withMinimumSequenceNumber($lowestAppliedSequenceNumber->next()); + $eventEnvelope = null; + foreach ($eventStream as $eventEnvelope) { + #\Neos\Flow\var_dump('EVENT ' . $eventEnvelope->event->type->value); + $event = $this->eventNormalizer->denormalize($eventEnvelope->event); + /** @var array{projection: ProjectionInterface, sequenceNumber: SequenceNumber, catchUpHook: CatchUpHookInterface|null} $projectionAndCatchUpHook */ + foreach ($projectionsAndCatchUpHooks as $projectionClassName => $projectionAndCatchUpHook) { + if ($projectionAndCatchUpHook['sequenceNumber']->value >= $eventEnvelope->sequenceNumber->value) { + continue; + } + #$projectionAndCatchUpHook['catchUpHook']?->onBeforeEvent($event, $eventEnvelope); + #\Neos\Flow\var_dump('APPLY ' . $eventEnvelope->event->type->value . ' (' . $eventEnvelope->sequenceNumber->value . ') TO ' . $projectionClassName); + #$futures[] = function () => $projectionAndCatchUpHook['projection']->apply($event, $eventEnvelope); + #$projectionAndCatchUpHook['catchUpHook']?->onAfterEvent($event, $eventEnvelope); + $projectionAndCatchUpHook['catchUpHook']?->onBeforeEvent($event, $eventEnvelope); + #\Neos\Flow\var_dump('APPLY ' . $eventEnvelope->event->type->value . ' (' . $eventEnvelope->sequenceNumber->value . ') TO ' . $projectionClassName); + $projectionAndCatchUpHook['projection']->apply($event, $eventEnvelope); + $projectionAndCatchUpHook['catchUpHook']?->onAfterEvent($event, $eventEnvelope); + #$projectionAndCatchUpHook['projection']->getCheckpointStorage()->updateAndReleaseLock($eventEnvelope->sequenceNumber); + #\Neos\Flow\var_dump('UPDATE CHECKPOINT for ' . $projectionClassName . ' to ' . $eventEnvelope->sequenceNumber->value); + } + } + assert($eventEnvelope instanceof EventEnvelope); + /** @var array{projection: ProjectionInterface, sequenceNumber: SequenceNumber, catchUpHook: CatchUpHookInterface|null} $projectionAndCatchUpHook */ + foreach ($projectionsAndCatchUpHooks as $projectionClassName => $projectionAndCatchUpHook) { + $projectionAndCatchUpHook['catchUpHook']?->onBeforeBatchCompleted(); + #$projectionAndCatchUpHook['projection']->getCheckpointStorage()->updateAndReleaseLock($eventEnvelope->sequenceNumber); + #\Neos\Flow\var_dump('UPDATE SQN FOR ' . $projectionClassName . ': ' . $eventEnvelope->sequenceNumber->value); + #\Neos\Flow\var_dump('UPDATE CHECKPOINT for ' . $projectionClassName . ' to ' . $eventEnvelope->sequenceNumber->value); + $projectionAndCatchUpHook['catchUpHook']?->onAfterCatchUp(); + } + $lock->release(); + } + /** * @param class-string> $projectionClassName */ @@ -168,36 +235,26 @@ public function catchUpProjection(string $projectionClassName, CatchUpOptions $o $catchUpHookFactory = $this->projectionsAndCatchUpHooks->getCatchUpHookFactoryForProjection($projection); $catchUpHook = $catchUpHookFactory?->build($this); - // TODO allow custom stream name per projection $streamName = VirtualStreamName::all(); $eventStream = $this->eventStore->load($streamName); if ($options->maximumSequenceNumber !== null) { $eventStream = $eventStream->withMaximumSequenceNumber($options->maximumSequenceNumber); } - - $eventApplier = function (EventEnvelope $eventEnvelope) use ($projection, $catchUpHook, $options) { + foreach ($eventStream as $eventEnvelope) { $event = $this->eventNormalizer->denormalize($eventEnvelope->event); if ($options->progressCallback !== null) { ($options->progressCallback)($event, $eventEnvelope); } - if (!$projection->canHandle($event)) { - return; - } $catchUpHook?->onBeforeEvent($event, $eventEnvelope); $projection->apply($event, $eventEnvelope); + // TODO this should happen in the inner transaction + $catchUpHook?->onBeforeBatchCompleted(); $catchUpHook?->onAfterEvent($event, $eventEnvelope); - }; - - $catchUp = CatchUp::create($eventApplier, $projection->getCheckpointStorage()); - - if ($catchUpHook !== null) { - $catchUpHook->onBeforeCatchUp(); - $catchUp = $catchUp->withOnBeforeBatchCompleted(fn() => $catchUpHook->onBeforeBatchCompleted()); } - $catchUp->run($eventStream); $catchUpHook?->onAfterCatchUp(); } + public function setUp(): void { $this->eventStore->setup(); diff --git a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php index aa0e2abed44..84b80d2f870 100644 --- a/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php +++ b/Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php @@ -4,9 +4,7 @@ namespace Neos\ContentRepository\Core\EventStore; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; -use Neos\ContentRepository\Core\CommandHandler\PendingProjections; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; +use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Projection\Projections; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; use Neos\EventStore\EventStoreInterface; @@ -19,51 +17,39 @@ * * @internal */ -final readonly class EventPersister +final class EventPersister { public function __construct( - private EventStoreInterface $eventStore, - private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, - private EventNormalizer $eventNormalizer, - private Projections $projections, + private readonly EventStoreInterface $eventStore, + private readonly EventNormalizer $eventNormalizer, + private readonly Projections $projections, ) { } /** * @param EventsToPublish $eventsToPublish - * @return CommandResult * @throws ConcurrencyException in case the expectedVersion does not match */ - public function publishEvents(EventsToPublish $eventsToPublish): CommandResult + public function publishEvents(ContentRepository $contentRepository, EventsToPublish $eventsToPublish): void { if ($eventsToPublish->events->isEmpty()) { - return new CommandResult(); + return; } // the following logic could also be done in an AppEventStore::commit method (being called // directly from the individual Command Handlers). $normalizedEvents = Events::fromArray( $eventsToPublish->events->map($this->eventNormalizer->normalize(...)) ); - $commitResult = $this->eventStore->commit( + $this->eventStore->commit( $eventsToPublish->streamName, $normalizedEvents, $eventsToPublish->expectedVersion ); - // for performance reasons, we do not want to update ALL projections all the time; but instead only - // the projections which are interested in the events from above. - // Further details can be found in the docs of PendingProjections. - $pendingProjections = PendingProjections::fromProjectionsAndEventsAndSequenceNumber( - $this->projections, - $eventsToPublish->events, - $commitResult->highestCommittedSequenceNumber - ); - - foreach ($pendingProjections->projections as $projection) { + foreach ($this->projections as $projection) { if ($projection instanceof WithMarkStaleInterface) { $projection->markStale(); } } - $this->projectionCatchUpTrigger->triggerCatchUp($pendingProjections->projections); - return new CommandResult(); + $contentRepository->catchUpProjections(); } } diff --git a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php index bba7d784579..100298b988b 100644 --- a/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php +++ b/Neos.ContentRepository.Core/Classes/Factory/ContentRepositoryFactory.php @@ -28,7 +28,6 @@ use Neos\ContentRepository\Core\Feature\WorkspaceCommandHandler; use Neos\ContentRepository\Core\Infrastructure\Property\PropertyConverter; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionsAndCatchUpHooks; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; @@ -53,7 +52,6 @@ public function __construct( ContentDimensionSourceInterface $contentDimensionSource, Serializer $propertySerializer, ProjectionsAndCatchUpHooksFactory $projectionsAndCatchUpHooksFactory, - private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger, private readonly UserIdProviderInterface $userIdProvider, private readonly ClockInterface $clock, ) { @@ -166,7 +164,6 @@ private function buildEventPersister(): EventPersister if (!$this->eventPersister) { $this->eventPersister = new EventPersister( $this->projectionFactoryDependencies->eventStore, - $this->projectionCatchUpTrigger, $this->projectionFactoryDependencies->eventNormalizer, $this->projectionsAndCatchUpHooks->projections, ); diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index 1e96149f95f..8c98d1af748 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -16,7 +16,6 @@ use Neos\ContentRepository\Core\CommandHandler\CommandHandlerInterface; use Neos\ContentRepository\Core\CommandHandler\CommandInterface; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\CommandHandlingDependencies; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\DecoratedEvent; @@ -240,6 +239,7 @@ private function handlePublishWorkspace( $baseWorkspace = $this->requireBaseWorkspace($workspace, $commandHandlingDependencies->getWorkspaceFinder()); $this->publishContentStream( + $contentRepository, $workspace->currentContentStreamId, $baseWorkspace->workspaceName, $baseWorkspace->currentContentStreamId @@ -276,10 +276,11 @@ private function handlePublishWorkspace( * @throws \Exception */ private function publishContentStream( + ContentRepository $contentRepository, ContentStreamId $contentStreamId, WorkspaceName $baseWorkspaceName, ContentStreamId $baseContentStreamId, - ): ?CommandResult { + ): void { $baseWorkspaceContentStreamName = ContentStreamEventStreamName::fromContentStreamId( $baseContentStreamId ); @@ -324,10 +325,11 @@ private function publishContentStream( } if (count($events) === 0) { - return null; + return; } try { - return $this->eventPersister->publishEvents( + $this->eventPersister->publishEvents( + $contentRepository, new EventsToPublish( $baseWorkspaceContentStreamName->getEventStreamName(), Events::fromArray($events), @@ -540,6 +542,7 @@ function () use ($matchingCommands, $commandHandlingDependencies, $baseWorkspace // 5) take EVENTS(MATCHING) and apply them to base WS. $this->publishContentStream( + $contentRepository, $command->contentStreamIdForMatchingPart, $baseWorkspace->workspaceName, $baseWorkspace->currentContentStreamId diff --git a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php b/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php deleted file mode 100644 index dac81690564..00000000000 --- a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalCheckpointStorage.php +++ /dev/null @@ -1,168 +0,0 @@ -connection->getDatabasePlatform(); - if (!($platform instanceof MySqlPlatform || $platform instanceof PostgreSqlPlatform)) { - throw new \InvalidArgumentException(sprintf('The %s only supports the platforms %s and %s currently. Given: %s', $this::class, MySqlPlatform::class, PostgreSqlPlatform::class, get_debug_type($platform)), 1660556004); - } - if (strlen($this->subscriberId) > 255) { - throw new \InvalidArgumentException('The subscriberId must not exceed 255 characters', 1705673456); - } - $this->platform = $platform; - } - - public function setUp(): void - { - foreach ($this->determineRequiredSqlStatements() as $statement) { - $this->connection->executeStatement($statement); - } - try { - $this->connection->insert($this->tableName, ['subscriberid' => $this->subscriberId, 'appliedsequencenumber' => 0]); - } catch (UniqueConstraintViolationException $e) { - // table and row already exists, ignore - } - } - - public function status(): CheckpointStorageStatus - { - try { - $this->connection->connect(); - } catch (\Throwable $e) { - return CheckpointStorageStatus::error(sprintf('Failed to connect to database for subscriber "%s": %s', $this->subscriberId, $e->getMessage())); - } - try { - $requiredSqlStatements = $this->determineRequiredSqlStatements(); - } catch (\Throwable $e) { - return CheckpointStorageStatus::error(sprintf('Failed to compare database schema for subscriber "%s": %s', $this->subscriberId, $e->getMessage())); - } - if ($requiredSqlStatements !== []) { - return CheckpointStorageStatus::setupRequired(sprintf('The following SQL statement%s required for subscriber "%s": %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', $this->subscriberId, implode(chr(10), $requiredSqlStatements))); - } - try { - $appliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->tableName . ' WHERE subscriberid = :subscriberId', ['subscriberId' => $this->subscriberId]); - } catch (\Throwable $e) { - return CheckpointStorageStatus::error(sprintf('Failed to determine initial applied sequence number for subscriber "%s": %s', $this->subscriberId, $e->getMessage())); - } - if ($appliedSequenceNumber === false) { - return CheckpointStorageStatus::setupRequired(sprintf('Initial initial applied sequence number not set for subscriber "%s"', $this->subscriberId)); - } - return CheckpointStorageStatus::ok(); - } - - public function acquireLock(): SequenceNumber - { - if ($this->connection->isTransactionActive()) { - throw new \RuntimeException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because a transaction is active already', $this->subscriberId), 1652268416); - } - $this->connection->beginTransaction(); - try { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ' . $this->platform->getForUpdateSQL() . ' NOWAIT', [ - 'subscriberId' => $this->subscriberId - ]); - } catch (DBALException $exception) { - $this->connection->rollBack(); - if ($exception instanceof LockWaitTimeoutException || ($exception instanceof DBALDriverException && ($exception->getErrorCode() === 3572 || $exception->getErrorCode() === 7))) { - throw new \RuntimeException(sprintf('Failed to acquire checkpoint lock for subscriber "%s" because it is acquired already', $this->subscriberId), 1652279016); - } - throw new \RuntimeException($exception->getMessage(), 1544207778, $exception); - } - if (!is_numeric($highestAppliedSequenceNumber)) { - $this->connection->rollBack(); - throw new \RuntimeException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279139); - } - $this->lockedSequenceNumber = SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - return $this->lockedSequenceNumber; - } - - public function updateAndReleaseLock(SequenceNumber $sequenceNumber): void - { - if ($this->lockedSequenceNumber === null) { - throw new \RuntimeException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because the lock has not been acquired successfully before', $this->subscriberId), 1660556344); - } - if (!$this->connection->isTransactionActive()) { - throw new \RuntimeException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because no transaction is active', $this->subscriberId), 1652279314); - } - if ($this->connection->isRollbackOnly()) { - // TODO as described in https://github.com/neos/neos-development-collection/issues/4970 we are in a bad state and cannot commit after a nested transaction was rolled back. - throw new \RuntimeException(sprintf('Failed to update and commit checkpoint for subscriber "%s" because the transaction has been marked for rollback only. See https://github.com/neos/neos-development-collection/issues/4970', $this->subscriberId), 1711964313); - } - try { - if (!$this->lockedSequenceNumber->equals($sequenceNumber)) { - $this->connection->update($this->tableName, ['appliedsequencenumber' => $sequenceNumber->value], ['subscriberid' => $this->subscriberId]); - } - $this->connection->commit(); - } catch (DBALException $exception) { - $this->connection->rollBack(); - throw new \RuntimeException(sprintf('Failed to update and commit highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279375, $exception); - } finally { - $this->lockedSequenceNumber = null; - } - } - - public function getHighestAppliedSequenceNumber(): SequenceNumber - { - $highestAppliedSequenceNumber = $this->connection->fetchOne('SELECT appliedsequencenumber FROM ' . $this->connection->quoteIdentifier($this->tableName) . ' WHERE subscriberid = :subscriberId ', [ - 'subscriberId' => $this->subscriberId - ]); - if (!is_numeric($highestAppliedSequenceNumber)) { - throw new \RuntimeException(sprintf('Failed to fetch highest applied sequence number for subscriber "%s". Please run %s::setUp()', $this->subscriberId, $this::class), 1652279427); - } - return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); - } - - // -------------- - - /** - * @return array - */ - private function determineRequiredSqlStatements(): array - { - $schemaManager = $this->connection->getSchemaManager(); - if (!$schemaManager instanceof AbstractSchemaManager) { - throw new \RuntimeException('Failed to retrieve Schema Manager', 1705681161); - } - $tableSchema = new Table( - $this->tableName, - [ - (new Column('subscriberid', Type::getType(Types::STRING)))->setLength(255), - (new Column('appliedsequencenumber', Type::getType(Types::INTEGER))) - ] - ); - $tableSchema->setPrimaryKey(['subscriberid']); - $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$tableSchema]); - return DbalSchemaDiff::determineRequiredSqlStatements($this->connection, $schema); - } -} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php deleted file mode 100644 index b997978bfe7..00000000000 --- a/Neos.ContentRepository.Core/Classes/Projection/CatchUp.php +++ /dev/null @@ -1,134 +0,0 @@ -batchSize < 1) { - throw new \InvalidArgumentException(sprintf('batch size must be a positive integer, given: %d', $this->batchSize), 1705672467); - } - } - - /** - * @param \Closure(EventEnvelope): void $eventHandler The callback that is invoked for every {@see EventEnvelope} that is processed - * @param CheckpointStorageInterface $checkpointStorage The checkpoint storage that saves the last processed {@see SequenceNumber} - */ - public static function create(\Closure $eventHandler, CheckpointStorageInterface $checkpointStorage): self - { - return new self($eventHandler, $checkpointStorage, 1, null); - } - - /** - * After how many events should the (database) transaction be committed? - * - * @param int $batchSize Number of events to process before the checkpoint is written - */ - public function withBatchSize(int $batchSize): self - { - if ($batchSize === $this->batchSize) { - return $this; - } - return new self($this->eventHandler, $this->checkpointStorage, $batchSize, $this->onBeforeBatchCompletedHook); - } - - /** - * This hook is called directly before the sequence number is persisted back in CheckpointStorage. - * Use this to trigger any operation which need to happen BEFORE the sequence number update is made - * visible to the outside. - * - * Overrides all previously registered onBeforeBatchCompleted hooks. - * - * @param \Closure(): void $callback the hook being called before the batch is completed - */ - public function withOnBeforeBatchCompleted(\Closure $callback): self - { - return new self($this->eventHandler, $this->checkpointStorage, $this->batchSize, $callback); - } - - /** - * Iterate over the $eventStream, invoke the specified event handler closure for every {@see EventEnvelope} and update - * the last processed sequence number in the {@see CheckpointStorageInterface} - * - * @param EventStreamInterface $eventStream The event stream to process - * @return SequenceNumber The last processed {@see SequenceNumber} - * @throws \Throwable Exceptions that are thrown during callback handling are re-thrown - */ - public function run(EventStreamInterface $eventStream): SequenceNumber - { - $highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock(); - $iteration = 0; - try { - foreach ($eventStream->withMinimumSequenceNumber($highestAppliedSequenceNumber->next()) as $eventEnvelope) { - if ($eventEnvelope->sequenceNumber->value <= $highestAppliedSequenceNumber->value) { - continue; - } - try { - ($this->eventHandler)($eventEnvelope); - } catch (\Exception $e) { - throw new \RuntimeException(sprintf('Exception while catching up to sequence number %d', $eventEnvelope->sequenceNumber->value), 1710707311, $e); - } - $iteration++; - if ($this->batchSize === 1 || $iteration % $this->batchSize === 0) { - if ($this->onBeforeBatchCompletedHook) { - ($this->onBeforeBatchCompletedHook)(); - } - $this->checkpointStorage->updateAndReleaseLock($eventEnvelope->sequenceNumber); - $highestAppliedSequenceNumber = $this->checkpointStorage->acquireLock(); - } else { - $highestAppliedSequenceNumber = $eventEnvelope->sequenceNumber; - } - } - } finally { - try { - if ($this->onBeforeBatchCompletedHook) { - ($this->onBeforeBatchCompletedHook)(); - } - } finally { - $this->checkpointStorage->updateAndReleaseLock($highestAppliedSequenceNumber); - } - } - return $highestAppliedSequenceNumber; - } -} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHookInterface.php b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHookInterface.php index b487fc21b72..5d04bbc7ad8 100644 --- a/Neos.ContentRepository.Core/Classes/Projection/CatchUpHookInterface.php +++ b/Neos.ContentRepository.Core/Classes/Projection/CatchUpHookInterface.php @@ -2,8 +2,8 @@ namespace Neos\ContentRepository\Core\Projection; +use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\EventStore\EventInterface; -use Neos\EventStore\CatchUp\CheckpointStorageInterface; use Neos\EventStore\Model\EventEnvelope; /** @@ -17,8 +17,7 @@ interface CatchUpHookInterface { /** - * This hook is called at the beginning of {@see ProjectionInterface::catchUpProjection()}; - * BEFORE the Database Lock is acquired (by {@see CheckpointStorageInterface::acquireLock()}). + * This hook is called at the beginning of {@see ContentRepository::catchUpProjection()}; * * @return void */ @@ -37,8 +36,8 @@ public function onBeforeEvent(EventInterface $eventInstance, EventEnvelope $even public function onAfterEvent(EventInterface $eventInstance, EventEnvelope $eventEnvelope): void; /** - * This hook is called directly before the database lock is RELEASED - * in {@see CheckpointStorageInterface::updateAndReleaseLock()}. + * This hook is called directly before the catchup is done + * in {@see ContentRepository::catchUpProjection()}. * * It can happen that this method is called multiple times, even without * having seen Events in the meantime. diff --git a/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php b/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php deleted file mode 100644 index 43dc37f8ab7..00000000000 --- a/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageInterface.php +++ /dev/null @@ -1,61 +0,0 @@ -type, $details); - } -} diff --git a/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageStatusType.php b/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageStatusType.php deleted file mode 100644 index 3e138d4348d..00000000000 --- a/Neos.ContentRepository.Core/Classes/Projection/CheckpointStorageStatusType.php +++ /dev/null @@ -1,12 +0,0 @@ -checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableName . '_checkpoint', - self::class - ); } public function setUp(): void @@ -87,18 +79,10 @@ public function setUp(): void foreach ($statements as $statement) { $this->dbal->executeStatement($statement); } - $this->checkpointStorage->setUp(); } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { $this->dbal->connect(); } catch (\Throwable $e) { @@ -112,6 +96,11 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } @@ -133,7 +122,8 @@ private function determineRequiredSqlStatements(): array // Should become a DB ENUM (unclear how to configure with DBAL) or int (latter needs adaption to code) (new Column('state', Type::getType(Types::BINARY)))->setLength(20)->setNotnull(true), (new Column('removed', Type::getType(Types::BOOLEAN)))->setDefault(false)->setNotnull(false) - ])) + ])), + CheckpointHelper::checkpointTableSchema($this->tableName . '_checkpoint') ]); return DbalSchemaDiff::determineRequiredSqlStatements($this->dbal, $schema); @@ -142,35 +132,16 @@ private function determineRequiredSqlStatements(): array public function reset(): void { $this->dbal->executeStatement('TRUNCATE table ' . $this->tableName); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); - } - - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - ContentStreamWasCreated::class, - RootWorkspaceWasCreated::class, - WorkspaceWasCreated::class, - ContentStreamWasForked::class, - WorkspaceWasDiscarded::class, - WorkspaceWasPartiallyDiscarded::class, - WorkspaceWasPartiallyPublished::class, - WorkspaceWasPublished::class, - WorkspaceWasRebased::class, - WorkspaceRebaseFailed::class, - ContentStreamWasClosed::class, - ContentStreamWasReopened::class, - ContentStreamWasRemoved::class, - DimensionShineThroughWasAdded::class, - ]) - || $event instanceof EmbedsContentStreamAndNodeAggregateId; + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableName . '_checkpoint'); } public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); if ($event instanceof EmbedsContentStreamAndNodeAggregateId) { $this->updateContentStreamVersion($event, $eventEnvelope); + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableName . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); return; } match ($event::class) { @@ -188,13 +159,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void ContentStreamWasReopened::class => $this->whenContentStreamWasReopened($event, $eventEnvelope), ContentStreamWasRemoved::class => $this->whenContentStreamWasRemoved($event, $eventEnvelope), DimensionShineThroughWasAdded::class => $this->whenDimensionShineThroughWasAdded($event, $eventEnvelope), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableName . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): CheckpointStorageInterface + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableName . '_checkpoint'); } public function getState(): ProjectionStateInterface diff --git a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpTriggerInterface.php b/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpTriggerInterface.php deleted file mode 100644 index 1fb20d34cbe..00000000000 --- a/Neos.ContentRepository.Core/Classes/Projection/ProjectionCatchUpTriggerInterface.php +++ /dev/null @@ -1,20 +0,0 @@ -checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableName . '_checkpoint', - self::class - ); $this->workspaceRuntimeCache = new WorkspaceRuntimeCache(); } @@ -78,18 +71,10 @@ public function setUp(): void foreach ($this->determineRequiredSqlStatements() as $statement) { $this->dbal->executeStatement($statement); } - $this->checkpointStorage->setUp(); } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { $this->dbal->connect(); } catch (\Throwable $e) { @@ -103,6 +88,11 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } @@ -126,38 +116,21 @@ private function determineRequiredSqlStatements(): array (new Column('status', Type::getType(Types::BINARY)))->setLength(20)->setNotnull(false) ]); $workspaceTable->setPrimaryKey(['workspacename']); + $checkpointTable = CheckpointHelper::checkpointTableSchema($this->tableName . '_checkpoint'); - $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$workspaceTable]); + $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$workspaceTable, $checkpointTable]); return DbalSchemaDiff::determineRequiredSqlStatements($this->dbal, $schema); } public function reset(): void { $this->dbal->exec('TRUNCATE ' . $this->tableName); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); - } - - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - WorkspaceWasCreated::class, - WorkspaceWasRenamed::class, - RootWorkspaceWasCreated::class, - WorkspaceWasDiscarded::class, - WorkspaceWasPartiallyDiscarded::class, - WorkspaceWasPartiallyPublished::class, - WorkspaceWasPublished::class, - WorkspaceWasRebased::class, - WorkspaceRebaseFailed::class, - WorkspaceWasRemoved::class, - WorkspaceOwnerWasChanged::class, - WorkspaceBaseWorkspaceWasChanged::class, - ]); + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableName . '_checkpoint'); } public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { WorkspaceWasCreated::class => $this->whenWorkspaceWasCreated($event), WorkspaceWasRenamed::class => $this->whenWorkspaceWasRenamed($event), @@ -171,13 +144,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void WorkspaceWasRemoved::class => $this->whenWorkspaceWasRemoved($event), WorkspaceOwnerWasChanged::class => $this->whenWorkspaceOwnerWasChanged($event), WorkspaceBaseWorkspaceWasChanged::class => $this->whenWorkspaceBaseWorkspaceWasChanged($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableName . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableName . '_checkpoint'); } public function getState(): WorkspaceFinder diff --git a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php index eb5f5540917..ac6c6653c93 100644 --- a/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php +++ b/Neos.ContentRepository.Core/Classes/Service/ContentStreamPruner.php @@ -4,7 +4,6 @@ namespace Neos\ContentRepository\Core\Service; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface; use Neos\ContentRepository\Core\Feature\ContentStreamEventStreamName; @@ -20,8 +19,6 @@ */ class ContentStreamPruner implements ContentRepositoryServiceInterface { - private ?CommandResult $lastCommandResult; - public function __construct( private readonly ContentRepository $contentRepository, private readonly EventStoreInterface $eventStore, @@ -51,7 +48,7 @@ public function prune(bool $removeTemporary = false): iterable ); foreach ($unusedContentStreams as $contentStream) { - $this->lastCommandResult = $this->contentRepository->handle( + $this->contentRepository->handle( RemoveContentStream::create($contentStream) ); } @@ -93,9 +90,4 @@ public function pruneAll(): void $this->eventStore->deleteStream($streamName); } } - - public function getLastCommandResult(): ?CommandResult - { - return $this->lastCommandResult; - } } diff --git a/Neos.ContentRepository.Core/Tests/Unit/Infrastructure/DbalSchemaDiffTest.php b/Neos.ContentRepository.Core/Tests/Unit/Infrastructure/DbalSchemaDiffTest.php index 7914d76355d..f919bb1c27c 100644 --- a/Neos.ContentRepository.Core/Tests/Unit/Infrastructure/DbalSchemaDiffTest.php +++ b/Neos.ContentRepository.Core/Tests/Unit/Infrastructure/DbalSchemaDiffTest.php @@ -9,7 +9,7 @@ use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; use PHPUnit\Framework\TestCase; class DbalSchemaDiffTest extends TestCase diff --git a/Neos.ContentRepository.Core/composer.json b/Neos.ContentRepository.Core/composer.json index 56bdba56ad6..5d60bb7a02b 100644 --- a/Neos.ContentRepository.Core/composer.json +++ b/Neos.ContentRepository.Core/composer.json @@ -11,17 +11,18 @@ "GPL-3.0-or-later" ], "require": { + "php": "^8.2", "neos/eventstore": "^1", "neos/eventstore-doctrineadapter": "^1 || ^2", - "php": "^8.2", "neos/error-messages": "*", "neos/utility-objecthandling": "*", "neos/utility-arrays": "*", - "doctrine/dbal": "^2.13", "symfony/serializer": "^6.3", "psr/clock": "^1", - "behat/transliterator": "~1.0", - "ramsey/uuid": "^3.0 || ^4.0" + "behat/transliterator": "^1", + "ramsey/uuid": "^3 || ^4", + "symfony/lock": "^6", + "neos/contentrepository-dbaltools": "*" }, "require-dev": { "roave/security-advisories": "dev-latest", diff --git a/Neos.ContentRepository.DbalTools/Classes/CheckpointHelper.php b/Neos.ContentRepository.DbalTools/Classes/CheckpointHelper.php new file mode 100644 index 00000000000..4f492ea2289 --- /dev/null +++ b/Neos.ContentRepository.DbalTools/Classes/CheckpointHelper.php @@ -0,0 +1,59 @@ +setPlatformOption('check', 'CHECK (id = 0)'), + (new Column('appliedsequencenumber', Type::getType(Types::INTEGER))), + ])) + ->setPrimaryKey(['id']); + } + + public static function resetCheckpoint(Connection $connection, string $tableName): void + { + $connection->executeStatement('INSERT INTO ' . $connection->quoteIdentifier($tableName) . ' (id, appliedsequencenumber) VALUES (0, 0) ON DUPLICATE KEY UPDATE appliedsequencenumber = 0'); + } + + public static function updateCheckpoint(Connection $connection, string $tableName, SequenceNumber $sequenceNumber): void + { + $connection->executeStatement('UPDATE ' . $connection->quoteIdentifier($tableName) . ' SET appliedsequencenumber = :sequenceNumber WHERE id = 0', ['sequenceNumber' => $sequenceNumber->value]); + } + + public static function getCheckpoint(Connection $connection, string $tableName): SequenceNumber + { + $highestAppliedSequenceNumber = $connection->fetchOne('SELECT appliedsequencenumber FROM ' . $connection->quoteIdentifier($tableName) . ' LIMIT 1 '); + if (!is_numeric($highestAppliedSequenceNumber)) { + throw new \RuntimeException('Failed to fetch highest applied sequence number', 1712942681); + } + return SequenceNumber::fromInteger((int)$highestAppliedSequenceNumber); + } + +} diff --git a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaDiff.php b/Neos.ContentRepository.DbalTools/Classes/DbalSchemaDiff.php similarity index 97% rename from Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaDiff.php rename to Neos.ContentRepository.DbalTools/Classes/DbalSchemaDiff.php index 1af625d9def..3202bc4067a 100644 --- a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaDiff.php +++ b/Neos.ContentRepository.DbalTools/Classes/DbalSchemaDiff.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Neos\ContentRepository\Core\Infrastructure; +namespace Neos\ContentRepository\DbalTools; use Doctrine\DBAL\Connection; use Doctrine\DBAL\Exception; diff --git a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaFactory.php b/Neos.ContentRepository.DbalTools/Classes/DbalSchemaFactory.php similarity index 98% rename from Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaFactory.php rename to Neos.ContentRepository.DbalTools/Classes/DbalSchemaFactory.php index 8f2d455f009..cefa41373de 100644 --- a/Neos.ContentRepository.Core/Classes/Infrastructure/DbalSchemaFactory.php +++ b/Neos.ContentRepository.DbalTools/Classes/DbalSchemaFactory.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Neos\ContentRepository\Core\Infrastructure; +namespace Neos\ContentRepository\DbalTools; use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\DBAL\Schema\Column; @@ -115,7 +115,7 @@ public static function columnForNodeTypeName(string $columnName): Column ->setCustomSchemaOption('collation', 'ascii_general_ci'); } - /** + /** * @param AbstractSchemaManager $schemaManager * @param Table[] $tables * @return Schema diff --git a/Neos.ContentRepository.DbalTools/composer.json b/Neos.ContentRepository.DbalTools/composer.json new file mode 100644 index 00000000000..702e11124fe --- /dev/null +++ b/Neos.ContentRepository.DbalTools/composer.json @@ -0,0 +1,34 @@ +{ + "name": "neos/contentrepository-dbaltools", + "type": "library", + "description": "Collection of Doctrine DBAL related tools for the neos/contentrepository-core", + "support": { + "source": "https://github.com/neos/contentrepository-dbaltools.git", + "forum": "https://discuss.neos.io/", + "docs": "https://docs.neos.io/" + }, + "license": [ + "GPL-3.0-or-later" + ], + "require": { + "php": "^8.2", + "doctrine/dbal": "^2 || ^3", + "neos/contentrepository-core": "*" + }, + "require-dev": { + "roave/security-advisories": "dev-latest", + "phpstan/phpstan": "^1.5", + "squizlabs/php_codesniffer": "^3.6", + "phpunit/phpunit": "^9.0" + }, + "autoload": { + "psr-4": { + "Neos\\ContentRepository\\DbalTools\\": "Classes" + } + }, + "autoload-dev": { + "psr-4": { + "Neos\\ContentRepository\\DbalTools\\Tests\\": "Tests" + } + } +} diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/AddDimensionShineThroughTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/AddDimensionShineThroughTransformationFactory.php index 824dcae90a4..f8decab4901 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/AddDimensionShineThroughTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/AddDimensionShineThroughTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Command\AddDimensionShineThrough; @@ -50,8 +49,8 @@ public function __construct( public function execute( WorkspaceName $workspaceNameForWriting, - ): CommandResult { - return $this->contentRepository->handle( + ): void { + $this->contentRepository->handle( AddDimensionShineThrough::create( $workspaceNameForWriting, $this->from, diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/AddNewPropertyTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/AddNewPropertyTransformationFactory.php index 778d3cd8d14..b6ad21e946a 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/AddNewPropertyTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/AddNewPropertyTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetSerializedNodeProperties; @@ -59,14 +58,14 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult { + ): void { if ($this->serializedValue === null) { // we don't need to unset a non-existing property - return null; + return; } if (!$node->hasProperty($this->newPropertyName)) { - return $this->contentRepository->handle( + $this->contentRepository->handle( SetSerializedNodeProperties::create( $workspaceNameForWriting, $node->aggregateId, @@ -81,8 +80,6 @@ public function execute( ) ); } - - return null; } }; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/ChangeNodeTypeTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/ChangeNodeTypeTransformationFactory.php index 2c584214551..42ec7df2b4c 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/ChangeNodeTypeTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/ChangeNodeTypeTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Feature\NodeTypeChange\Command\ChangeNodeAggregateType; use Neos\ContentRepository\Core\Feature\NodeTypeChange\Dto\NodeAggregateTypeChangeChildConstraintConflictResolutionStrategy; @@ -65,8 +64,8 @@ public function execute( NodeAggregate $nodeAggregate, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): CommandResult { - return $this->contentRepository->handle(ChangeNodeAggregateType::create( + ): void { + $this->contentRepository->handle(ChangeNodeAggregateType::create( $workspaceNameForWriting, $nodeAggregate->nodeAggregateId, NodeTypeName::fromString($this->newType), diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/ChangePropertyValueTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/ChangePropertyValueTransformationFactory.php index 41712b0325c..c0b51852b6b 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/ChangePropertyValueTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/ChangePropertyValueTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetSerializedNodeProperties; @@ -106,7 +105,7 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult { + ): void { $currentProperty = $node->properties->serialized()->getProperty($this->propertyName); if ($currentProperty !== null) { $value = $currentProperty->value; @@ -128,7 +127,7 @@ public function execute( $newValueWithReplacedCurrentValue ); - return $this->contentRepository->handle( + $this->contentRepository->handle( SetSerializedNodeProperties::create( $workspaceNameForWriting, $node->aggregateId, @@ -143,8 +142,6 @@ public function execute( ) ); } - - return null; } }; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/GlobalTransformationInterface.php b/Neos.ContentRepository.NodeMigration/src/Transformation/GlobalTransformationInterface.php index a8017546f84..7fed0930401 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/GlobalTransformationInterface.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/GlobalTransformationInterface.php @@ -14,8 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; -use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; /** @@ -27,5 +25,5 @@ interface GlobalTransformationInterface { public function execute( WorkspaceName $workspaceNameForWriting, - ): CommandResult; + ): void; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/MoveDimensionSpacePointTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/MoveDimensionSpacePointTransformationFactory.php index eeda40a3ea5..0b7cc1b7679 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/MoveDimensionSpacePointTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/MoveDimensionSpacePointTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\Feature\DimensionSpaceAdjustment\Command\MoveDimensionSpacePoint; @@ -48,8 +47,8 @@ public function __construct( public function execute( WorkspaceName $workspaceNameForWriting, - ): CommandResult { - return $this->contentRepository->handle( + ): void { + $this->contentRepository->handle( MoveDimensionSpacePoint::create( $workspaceNameForWriting, $this->from, diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/NodeAggregateBasedTransformationInterface.php b/Neos.ContentRepository.NodeMigration/src/Transformation/NodeAggregateBasedTransformationInterface.php index e5421f8114a..8b2ebd2787c 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/NodeAggregateBasedTransformationInterface.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/NodeAggregateBasedTransformationInterface.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\Projection\ContentGraph\NodeAggregate; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\ContentRepository\Core\SharedModel\Workspace\WorkspaceName; @@ -30,5 +29,5 @@ public function execute( NodeAggregate $nodeAggregate, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): CommandResult; + ): void; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/NodeBasedTransformationInterface.php b/Neos.ContentRepository.NodeMigration/src/Transformation/NodeBasedTransformationInterface.php index 88a1573f674..1b2ab86ae4f 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/NodeBasedTransformationInterface.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/NodeBasedTransformationInterface.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Projection\ContentGraph\Node; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; @@ -32,5 +31,5 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult; + ): void; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/RemoveNodeTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/RemoveNodeTransformationFactory.php index 53a2276fa3f..fbd0a2b46fa 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/RemoveNodeTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/RemoveNodeTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePoint; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; @@ -66,7 +65,7 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult { + ): void { if ($this->strategy === null) { $this->strategy = NodeVariantSelectionStrategy::STRATEGY_ALL_SPECIALIZATIONS; } @@ -84,10 +83,10 @@ public function execute( if (!$coveredDimensionSpacePoints->contains($coveredDimensionSpacePoint)) { // we are currently in a Node which has other covered dimension space points than the target ones, // so we do not need to do anything. - return null; + return; } - return $this->contentRepository->handle(RemoveNodeAggregate::create( + $this->contentRepository->handle(RemoveNodeAggregate::create( $workspaceNameForWriting, $node->aggregateId, $coveredDimensionSpacePoint, diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/RemovePropertyTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/RemovePropertyTransformationFactory.php index 46d7e3255ca..c0990547e2d 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/RemovePropertyTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/RemovePropertyTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetSerializedNodeProperties; @@ -54,9 +53,9 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult { + ): void { if ($node->hasProperty($this->propertyName)) { - return $this->contentRepository->handle( + $this->contentRepository->handle( SetSerializedNodeProperties::create( $workspaceNameForWriting, $node->aggregateId, @@ -66,8 +65,6 @@ public function execute( ) ); } - - return null; } }; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/RenameNodeAggregateTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/RenameNodeAggregateTransformationFactory.php index 207a34b3869..3e05d002d1b 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/RenameNodeAggregateTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/RenameNodeAggregateTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\Feature\NodeRenaming\Command\ChangeNodeAggregateName; use Neos\ContentRepository\Core\Projection\ContentGraph\NodeAggregate; @@ -50,8 +49,8 @@ public function execute( NodeAggregate $nodeAggregate, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): CommandResult { - return $this->contentRepository->handle(ChangeNodeAggregateName::create( + ): void { + $this->contentRepository->handle(ChangeNodeAggregateName::create( $workspaceNameForWriting, $nodeAggregate->nodeAggregateId, NodeName::fromString($this->newNodeName), diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/RenamePropertyTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/RenamePropertyTransformationFactory.php index 0d1cc13ed67..ff70a13bd7f 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/RenamePropertyTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/RenamePropertyTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetSerializedNodeProperties; @@ -61,11 +60,11 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult + ): void { $serializedPropertyValue = $node->properties->serialized()->getProperty($this->from); if ($serializedPropertyValue !== null) { - return $this->contentRepository->handle( + $this->contentRepository->handle( SetSerializedNodeProperties::create( $workspaceNameForWriting, $node->aggregateId, @@ -77,8 +76,6 @@ public function execute( ) ); } - - return null; } }; } diff --git a/Neos.ContentRepository.NodeMigration/src/Transformation/StripTagsOnPropertyTransformationFactory.php b/Neos.ContentRepository.NodeMigration/src/Transformation/StripTagsOnPropertyTransformationFactory.php index f071722b34f..9803c9509b2 100644 --- a/Neos.ContentRepository.NodeMigration/src/Transformation/StripTagsOnPropertyTransformationFactory.php +++ b/Neos.ContentRepository.NodeMigration/src/Transformation/StripTagsOnPropertyTransformationFactory.php @@ -14,7 +14,6 @@ namespace Neos\ContentRepository\NodeMigration\Transformation; -use Neos\ContentRepository\Core\CommandHandler\CommandResult; use Neos\ContentRepository\Core\ContentRepository; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\Feature\NodeModification\Command\SetSerializedNodeProperties; @@ -55,7 +54,7 @@ public function execute( DimensionSpacePointSet $coveredDimensionSpacePoints, WorkspaceName $workspaceNameForWriting, ContentStreamId $contentStreamForWriting - ): ?CommandResult { + ): void { $serializedPropertyValue = $node->properties->serialized()->getProperty($this->propertyName); if ($serializedPropertyValue !== null) { $propertyValue = $serializedPropertyValue->value; @@ -66,7 +65,7 @@ public function execute( ); } $newValue = strip_tags($propertyValue); - return $this->contentRepository->handle( + $this->contentRepository->handle( SetSerializedNodeProperties::create( $workspaceNameForWriting, $node->aggregateId, @@ -81,8 +80,6 @@ public function execute( ) ); } - - return null; } }; } diff --git a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php index 9d2e39975ad..68d33d77ee8 100644 --- a/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php +++ b/Neos.ContentRepository.StructureAdjustment/src/StructureAdjustmentService.php @@ -95,7 +95,7 @@ public function fixError(StructureAdjustment $adjustment): void $remediation = $adjustment->remediation; $eventsToPublish = $remediation(); assert($eventsToPublish instanceof EventsToPublish); - $this->eventPersister->publishEvents($eventsToPublish); + $this->eventPersister->publishEvents($this->contentRepository, $eventsToPublish); } } } diff --git a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php index c8e2118d5e4..bc49c4c9e37 100644 --- a/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php +++ b/Neos.ContentRepository.TestSuite/Classes/Behavior/Features/Bootstrap/GenericCommandExecutionAndEventPublication.php @@ -140,7 +140,7 @@ protected function publishEvent(string $eventType, StreamName $streamName, array ->getValue($eventPersister); $event = $eventNormalizer->denormalize($artificiallyConstructedEvent); - $eventPersister->publishEvents(new EventsToPublish( + $eventPersister->publishEvents($this->currentContentRepository, new EventsToPublish( $streamName, Events::with($event), ExpectedVersion::ANY() diff --git a/Neos.ContentRepositoryRegistry.PostgresDbalClient/Configuration/Settings.yaml b/Neos.ContentRepositoryRegistry.PostgresDbalClient/Configuration/Settings.yaml new file mode 100644 index 00000000000..a3956ff3a11 --- /dev/null +++ b/Neos.ContentRepositoryRegistry.PostgresDbalClient/Configuration/Settings.yaml @@ -0,0 +1,9 @@ +## uncomment the following lines in order to change the default implementation of the content graph projection +#Neos: +# ContentRepositoryRegistry: +# presets: +# 'default': +# projections: +# # NOTE: the following name should be stable, because there must only be a single content graph projection per content repository +# 'Neos.ContentRepository:ContentGraph': +# factoryObjectName: Neos\ContentGraph\PostgreSQLAdapter\HypergraphProjectionFactory diff --git a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php b/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php deleted file mode 100644 index f22557e0c21..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Command/SubprocessProjectionCatchUpCommandController.php +++ /dev/null @@ -1,37 +0,0 @@ -> $projectionClassName fully qualified class name of the projection to catch up - * @internal - */ - public function catchupCommand(string $contentRepositoryIdentifier, string $projectionClassName): void - { - $contentRepository = $this->contentRepositoryRegistry->get(ContentRepositoryId::fromString($contentRepositoryIdentifier)); - $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php index 1fd7cc04910..8fde1f030be 100644 --- a/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php +++ b/Neos.ContentRepositoryRegistry/Classes/ContentRepositoryRegistry.php @@ -14,7 +14,6 @@ use Neos\ContentRepository\Core\Projection\CatchUpHookFactoryInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\ContentSubgraphInterface; use Neos\ContentRepository\Core\Projection\ContentGraph\Node; -use Neos\ContentRepository\Core\Projection\ProjectionCatchUpTriggerInterface; use Neos\ContentRepository\Core\Projection\ProjectionFactoryInterface; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\ContentRepository\Core\SharedModel\User\UserIdProviderInterface; @@ -24,7 +23,6 @@ use Neos\ContentRepositoryRegistry\Factory\ContentDimensionSource\ContentDimensionSourceFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\EventStore\EventStoreFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\NodeTypeManager\NodeTypeManagerFactoryInterface; -use Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\ProjectionCatchUpTriggerFactoryInterface; use Neos\ContentRepositoryRegistry\Factory\UserIdProvider\UserIdProviderFactoryInterface; use Neos\EventStore\EventStoreInterface; use Neos\Flow\Annotations as Flow; @@ -156,7 +154,6 @@ private function buildFactory(ContentRepositoryId $contentRepositoryId): Content $this->buildContentDimensionSource($contentRepositoryId, $contentRepositorySettings), $this->buildPropertySerializer($contentRepositoryId, $contentRepositorySettings), $this->buildProjectionsFactory($contentRepositoryId, $contentRepositorySettings), - $this->buildProjectionCatchUpTrigger($contentRepositoryId, $contentRepositorySettings), $this->buildUserIdProvider($contentRepositoryId, $contentRepositorySettings), $clock ); @@ -250,17 +247,6 @@ private function buildProjectionsFactory(ContentRepositoryId $contentRepositoryI return $projectionsFactory; } - /** @param array $contentRepositorySettings */ - private function buildProjectionCatchUpTrigger(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): ProjectionCatchUpTriggerInterface - { - isset($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']) || throw InvalidConfigurationException::fromMessage('Content repository "%s" does not have projectionCatchUpTrigger.factoryObjectName configured.', $contentRepositoryId->value); - $projectionCatchUpTriggerFactory = $this->objectManager->get($contentRepositorySettings['projectionCatchUpTrigger']['factoryObjectName']); - if (!$projectionCatchUpTriggerFactory instanceof ProjectionCatchUpTriggerFactoryInterface) { - throw InvalidConfigurationException::fromMessage('projectionCatchUpTrigger.factoryObjectName for content repository "%s" is not an instance of %s but %s.', $contentRepositoryId->value, ProjectionCatchUpTriggerFactoryInterface::class, get_debug_type($projectionCatchUpTriggerFactory)); - } - return $projectionCatchUpTriggerFactory->build($contentRepositoryId, $contentRepositorySettings['projectionCatchUpTrigger']['options'] ?? []); - } - /** @param array $contentRepositorySettings */ private function buildUserIdProvider(ContentRepositoryId $contentRepositoryId, array $contentRepositorySettings): UserIdProviderInterface { diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/EventStore/DoctrineEventStoreFactory.php b/Neos.ContentRepositoryRegistry/Classes/Factory/EventStore/DoctrineEventStoreFactory.php index f5874b41990..6b7748f5602 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/EventStore/DoctrineEventStoreFactory.php +++ b/Neos.ContentRepositoryRegistry/Classes/Factory/EventStore/DoctrineEventStoreFactory.php @@ -3,27 +3,43 @@ namespace Neos\ContentRepositoryRegistry\Factory\EventStore; -use Doctrine\DBAL\Connection; +use Doctrine\DBAL\DriverManager; +use Doctrine\ORM\EntityManagerInterface; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; use Neos\EventStore\DoctrineAdapter\DoctrineEventStore; -use Neos\EventStore\EventStoreInterface; use Psr\Clock\ClockInterface; class DoctrineEventStoreFactory implements EventStoreFactoryInterface { + /** + * @var array Runtime cache for created event store instances to prevent too many connections + */ + private static array $instances = []; + public function __construct( - private readonly Connection $connection, + private readonly EntityManagerInterface $entityManager, ) { } /** @param array $options */ - public function build(ContentRepositoryId $contentRepositoryId, array $options, ClockInterface $clock): EventStoreInterface + public function build(ContentRepositoryId $contentRepositoryId, array $options, ClockInterface $clock): DoctrineEventStore { - return new DoctrineEventStore( - $this->connection, - self::databaseTableName($contentRepositoryId), - $clock - ); + $dsn = $options['dsn'] ?? null; + $hash = md5($contentRepositoryId->value . '|' . $clock::class . '|' . $dsn); + if (!array_key_exists($hash, self::$instances)) { + if ($dsn !== null) { + $connection = DriverManager::getConnection(['url' => $dsn]); + } else { + // We create a new connection instance in order to avoid nested transactions + $connection = DriverManager::getConnection($this->entityManager->getConnection()->getParams(), $this->entityManager->getConfiguration(), $this->entityManager->getEventManager()); + } + self::$instances[$hash] = new DoctrineEventStore( + $connection, + self::databaseTableName($contentRepositoryId), + $clock + ); + } + return self::$instances[$hash]; } public static function databaseTableName(ContentRepositoryId $contentRepositoryId): string diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/CatchUpTriggerWithSynchronousOption.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/CatchUpTriggerWithSynchronousOption.php deleted file mode 100644 index d6b5342d1e9..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/CatchUpTriggerWithSynchronousOption.php +++ /dev/null @@ -1,79 +0,0 @@ -contentRepositoryRegistry->get($this->contentRepositoryId); - foreach ($projections as $projection) { - $projectionClassName = get_class($projection); - $contentRepository->catchUpProjection($projectionClassName, CatchUpOptions::create()); - } - } else { - $this->inner->triggerCatchUp($projections); - } - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/ProjectionCatchUpTriggerFactoryInterface.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/ProjectionCatchUpTriggerFactoryInterface.php deleted file mode 100644 index 5f73b1859ad..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/ProjectionCatchUpTriggerFactoryInterface.php +++ /dev/null @@ -1,12 +0,0 @@ - $options */ - public function build(ContentRepositoryId $contentRepositoryId, array $options): ProjectionCatchUpTriggerInterface; -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php deleted file mode 100644 index f72541368a4..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTrigger.php +++ /dev/null @@ -1,43 +0,0 @@ - - */ - protected $flowSettings; - - public function __construct( - private readonly ContentRepositoryId $contentRepositoryId - ) { - } - - public function triggerCatchUp(Projections $projections): void - { - // modelled after https://github.com/neos/Neos.EventSourcing/blob/master/Classes/EventPublisher/JobQueueEventPublisher.php#L103 - // and https://github.com/Flowpack/jobqueue-common/blob/master/Classes/Queue/FakeQueue.php - foreach ($projections as $projection) { - Scripts::executeCommandAsync( - 'neos.contentrepositoryregistry:subprocessprojectioncatchup:catchup', - $this->flowSettings, - [ - 'contentRepositoryIdentifier' => $this->contentRepositoryId->value, - 'projectionClassName' => get_class($projection) - ] - ); - } - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTriggerFactory.php b/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTriggerFactory.php deleted file mode 100644 index 8695f517d82..00000000000 --- a/Neos.ContentRepositoryRegistry/Classes/Factory/ProjectionCatchUpTrigger/SubprocessProjectionCatchUpTriggerFactory.php +++ /dev/null @@ -1,22 +0,0 @@ - $options */ - public function build(ContentRepositoryId $contentRepositoryId, array $options): ProjectionCatchUpTriggerInterface - { - return new CatchUpTriggerWithSynchronousOption( - $contentRepositoryId, - new SubprocessProjectionCatchUpTrigger($contentRepositoryId) - ); - } -} diff --git a/Neos.ContentRepositoryRegistry/Classes/Service/EventMigrationService.php b/Neos.ContentRepositoryRegistry/Classes/Service/EventMigrationService.php index f865d275860..ba554527ad5 100644 --- a/Neos.ContentRepositoryRegistry/Classes/Service/EventMigrationService.php +++ b/Neos.ContentRepositoryRegistry/Classes/Service/EventMigrationService.php @@ -387,7 +387,6 @@ private static function decodeEventPayload(EventEnvelope $eventEnvelope): array private function updateEventPayload(SequenceNumber $sequenceNumber, array $payload): void { $eventTableName = DoctrineEventStoreFactory::databaseTableName($this->contentRepositoryId); - $this->connection->beginTransaction(); $this->connection->executeStatement( 'UPDATE ' . $eventTableName . ' SET payload=:payload WHERE sequencenumber=:sequenceNumber', [ @@ -395,7 +394,6 @@ private function updateEventPayload(SequenceNumber $sequenceNumber, array $paylo 'payload' => json_encode($payload), ] ); - $this->connection->commit(); $this->eventsModified[$sequenceNumber->value] = true; } @@ -405,7 +403,6 @@ private function updateEventPayload(SequenceNumber $sequenceNumber, array $paylo private function updateEventMetaData(SequenceNumber $sequenceNumber, array $eventMetaData): void { $eventTableName = DoctrineEventStoreFactory::databaseTableName($this->contentRepositoryId); - $this->connection->beginTransaction(); $this->connection->executeStatement( 'UPDATE ' . $eventTableName . ' SET metadata=:metadata WHERE sequencenumber=:sequenceNumber', [ @@ -413,7 +410,6 @@ private function updateEventMetaData(SequenceNumber $sequenceNumber, array $even 'metadata' => json_encode($eventMetaData), ] ); - $this->connection->commit(); $this->eventsModified[$sequenceNumber->value] = true; } diff --git a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml b/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml deleted file mode 100644 index 19b4e1c3b8f..00000000000 --- a/Neos.ContentRepositoryRegistry/Configuration/Caches.yaml +++ /dev/null @@ -1,5 +0,0 @@ -Neos_ContentGraph_DoctrineDbalAdapter_ProcessedEvents: - frontend: Neos\Cache\Frontend\VariableFrontend - backend: Neos\Cache\Backend\FileBackend - backendOptions: - defaultLifetime: 400 diff --git a/Neos.ContentRepositoryRegistry/Configuration/Settings.yaml b/Neos.ContentRepositoryRegistry/Configuration/Settings.yaml index 355f22e13d2..2f891049e10 100644 --- a/Neos.ContentRepositoryRegistry/Configuration/Settings.yaml +++ b/Neos.ContentRepositoryRegistry/Configuration/Settings.yaml @@ -31,9 +31,6 @@ Neos: contentDimensionSource: factoryObjectName: Neos\ContentRepositoryRegistry\Factory\ContentDimensionSource\ConfigurationBasedContentDimensionSourceFactory - projectionCatchUpTrigger: - factoryObjectName: Neos\ContentRepositoryRegistry\Factory\ProjectionCatchUpTrigger\SubprocessProjectionCatchUpTriggerFactory - userIdProvider: factoryObjectName: Neos\ContentRepositoryRegistry\Factory\UserIdProvider\StaticUserIdProviderFactory diff --git a/Neos.Fusion/Classes/Core/RuntimeConfiguration.php b/Neos.Fusion/Classes/Core/RuntimeConfiguration.php index 364ddec5e46..fa2d08be720 100644 --- a/Neos.Fusion/Classes/Core/RuntimeConfiguration.php +++ b/Neos.Fusion/Classes/Core/RuntimeConfiguration.php @@ -68,6 +68,7 @@ public function forPath(string $fusionPath): array if ($fusionPath === '') { throw new Exception('Fusion path cannot be empty.', 1695308681); } + $currentPrototypeDefinitions = []; // Find longest prefix of path that already is in path cache $pathUntilNow = ''; diff --git a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php index ec762433ff9..59c075cf595 100644 --- a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php +++ b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageProjection.php @@ -5,6 +5,7 @@ namespace Neos\Neos\AssetUsage\Projection; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Schema\AbstractSchemaManager; use Doctrine\ORM\Exception\ORMException; use Neos\ContentRepository\Core\EventStore\EventInterface; use Neos\ContentRepository\Core\Feature\ContentStreamForking\Event\ContentStreamWasForked; @@ -19,11 +20,12 @@ use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPartiallyPublished; use Neos\ContentRepository\Core\Feature\WorkspacePublication\Event\WorkspaceWasPublished; use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased; -use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; -use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\SharedModel\ContentRepository\ContentRepositoryId; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; +use Neos\ContentRepository\DbalTools\DbalSchemaFactory; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; use Neos\Media\Domain\Model\AssetInterface; @@ -44,29 +46,22 @@ final class AssetUsageProjection implements ProjectionInterface { private ?AssetUsageFinder $stateAccessor = null; private AssetUsageRepository $repository; - private DbalCheckpointStorage $checkpointStorage; /** @var array */ private array $originalAssetIdMappingRuntimeCache = []; public function __construct( private readonly AssetRepository $assetRepository, ContentRepositoryId $contentRepositoryId, - Connection $dbal, + private readonly Connection $dbal, AssetUsageRepositoryFactory $assetUsageRepositoryFactory, ) { $this->repository = $assetUsageRepositoryFactory->build($contentRepositoryId); - $this->checkpointStorage = new DbalCheckpointStorage( - $dbal, - $this->repository->getTableNamePrefix() . '_checkpoint', - self::class - ); } public function reset(): void { $this->repository->reset(); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); + CheckpointHelper::resetCheckpoint($this->dbal, $this->repository->getTableNamePrefix() . '_checkpoint'); } public function whenNodeAggregateWithNodeWasCreated(NodeAggregateWithNodeWasCreated $event, EventEnvelope $eventEnvelope): void @@ -230,18 +225,13 @@ private function extractAssetIds(string $type, mixed $value): array public function setUp(): void { $this->repository->setUp(); - $this->checkpointStorage->setUp(); + foreach ($this->determineRequiredSqlStatements() as $statement) { + $this->dbal->executeStatement($statement); + } } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { $falseOrDetailsString = $this->repository->isSetupRequired(); if (is_string($falseOrDetailsString)) { @@ -250,28 +240,25 @@ public function status(): ProjectionStatus } catch (\Throwable $e) { return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage())); } + try { + $requiredSqlStatements = $this->determineRequiredSqlStatements(); + } catch (\Throwable $e) { + return ProjectionStatus::error(sprintf('Failed to determine required SQL statements: %s', $e->getMessage())); + } + if ($requiredSqlStatements !== []) { + return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); + } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - NodeAggregateWithNodeWasCreated::class, - NodePropertiesWereSet::class, - NodeAggregateWasRemoved::class, - NodePeerVariantWasCreated::class, - ContentStreamWasForked::class, - WorkspaceWasDiscarded::class, - WorkspaceWasPartiallyDiscarded::class, - WorkspaceWasPartiallyPublished::class, - WorkspaceWasPublished::class, - WorkspaceWasRebased::class, - ContentStreamWasRemoved::class, - ]); - } - public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { NodeAggregateWithNodeWasCreated::class => $this->whenNodeAggregateWithNodeWasCreated($event, $eventEnvelope), NodePropertiesWereSet::class => $this->whenNodePropertiesWereSet($event, $eventEnvelope), @@ -284,13 +271,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void WorkspaceWasPublished::class => $this->whenWorkspaceWasPublished($event), WorkspaceWasRebased::class => $this->whenWorkspaceWasRebased($event), ContentStreamWasRemoved::class => $this->whenContentStreamWasRemoved($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->repository->getTableNamePrefix() . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->repository->getTableNamePrefix() . '_checkpoint'); } public function getState(): AssetUsageFinder @@ -301,6 +290,22 @@ public function getState(): AssetUsageFinder return $this->stateAccessor; } + /** + * @return array + */ + private function determineRequiredSqlStatements(): array + { + $schemaManager = $this->dbal->getSchemaManager(); + if (!$schemaManager instanceof AbstractSchemaManager) { + throw new \RuntimeException('Failed to retrieve Schema Manager', 1625653914); + } + $checkpointTable = CheckpointHelper::checkpointTableSchema($this->repository->getTableNamePrefix() . '_checkpoint'); + + $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$checkpointTable]); + return DbalSchemaDiff::determineRequiredSqlStatements($this->dbal, $schema); + } + + private function findOriginalAssetId(string $assetId): ?string { if (!array_key_exists($assetId, $this->originalAssetIdMappingRuntimeCache)) { diff --git a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php index 3de07925cc1..b83a0ebf447 100644 --- a/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php +++ b/Neos.Neos/Classes/AssetUsage/Projection/AssetUsageRepository.php @@ -17,8 +17,8 @@ use Doctrine\DBAL\Types\Types; use Neos\ContentRepository\Core\DimensionSpace\DimensionSpacePointSet; use Neos\ContentRepository\Core\DimensionSpace\OriginDimensionSpacePoint; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; +use Neos\ContentRepository\DbalTools\DbalSchemaFactory; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; use Neos\Neos\AssetUsage\Dto\AssetIdAndOriginalAssetId; diff --git a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php index f6269e7cba3..2177ce11f6c 100644 --- a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php +++ b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathProjection.php @@ -26,15 +26,14 @@ use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; -use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; use Neos\ContentRepository\Core\NodeType\NodeTypeManager; use Neos\ContentRepository\Core\NodeType\NodeTypeName; -use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\Projection\WithMarkStaleInterface; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; use Neos\Neos\Domain\Model\SiteNodeName; @@ -50,7 +49,6 @@ final class DocumentUriPathProjection implements ProjectionInterface, WithMarkSt 'shortcutTarget' => Types::JSON, ]; - private DbalCheckpointStorage $checkpointStorage; private ?DocumentUriPathFinder $stateAccessor = null; /** @@ -63,11 +61,6 @@ public function __construct( private readonly Connection $dbal, private readonly string $tableNamePrefix, ) { - $this->checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableNamePrefix . '_checkpoint', - self::class - ); } public function setUp(): void @@ -75,18 +68,10 @@ public function setUp(): void foreach ($this->determineRequiredSqlStatements() as $statement) { $this->dbal->executeStatement($statement); } - $this->checkpointStorage->setUp(); } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { $this->dbal->connect(); } catch (\Throwable $e) { @@ -100,6 +85,11 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } @@ -120,8 +110,7 @@ private function determineRequiredSqlStatements(): array public function reset(): void { $this->truncateDatabaseTables(); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); $this->stateAccessor = null; } @@ -135,30 +124,9 @@ private function truncateDatabaseTables(): void } } - - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - RootWorkspaceWasCreated::class, - RootNodeAggregateWithNodeWasCreated::class, - RootNodeAggregateDimensionsWereUpdated::class, - NodeAggregateWithNodeWasCreated::class, - NodeAggregateTypeWasChanged::class, - NodePeerVariantWasCreated::class, - NodeGeneralizationVariantWasCreated::class, - NodeSpecializationVariantWasCreated::class, - SubtreeWasTagged::class, - SubtreeWasUntagged::class, - NodeAggregateWasRemoved::class, - NodePropertiesWereSet::class, - NodeAggregateWasMoved::class, - DimensionSpacePointWasMoved::class, - DimensionShineThroughWasAdded::class, - ]); - } - public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { RootWorkspaceWasCreated::class => $this->whenRootWorkspaceWasCreated($event), RootNodeAggregateWithNodeWasCreated::class => $this->whenRootNodeAggregateWithNodeWasCreated($event), @@ -175,13 +143,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void NodeAggregateWasMoved::class => $this->whenNodeAggregateWasMoved($event), DimensionSpacePointWasMoved::class => $this->whenDimensionSpacePointWasMoved($event), DimensionShineThroughWasAdded::class => $this->whenDimensionShineThroughWasAdded($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); } public function getState(): DocumentUriPathFinder diff --git a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathSchemaBuilder.php b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathSchemaBuilder.php index aa074353049..187c239c835 100644 --- a/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathSchemaBuilder.php +++ b/Neos.Neos/Classes/FrontendRouting/Projection/DocumentUriPathSchemaBuilder.php @@ -8,7 +8,8 @@ use Doctrine\DBAL\Schema\Table; use Doctrine\DBAL\Types\Type; use Doctrine\DBAL\Types\Types; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaFactory; class DocumentUriPathSchemaBuilder { @@ -23,7 +24,8 @@ public function buildSchema(AbstractSchemaManager $schemaManager): Schema { $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [ $this->createUriTable(), - $this->createLiveContentStreamsTable() + $this->createLiveContentStreamsTable(), + CheckpointHelper::checkpointTableSchema($this->tableNamePrefix . '_checkpoint'), ]); return $schema; diff --git a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php index c71a2965ca2..2efd26a30ed 100644 --- a/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php +++ b/Neos.Neos/Classes/Fusion/Cache/GraphProjectorCatchUpHookForCacheFlushing.php @@ -28,9 +28,7 @@ * is not needed: * * By calling {@see self::disabled(\Closure)} in your code, all projection updates - * will never trigger catch up hooks. This will only work when - * {@see CatchUpTriggerWithSynchronousOption::synchronously()} is called, - * as otherwise this subprocess won't be called. + * will never trigger catch up hooks. * * * The following scenario explains how to think about cache flushing. diff --git a/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php b/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php index 574354ca99f..ea2afa54f36 100644 --- a/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php +++ b/Neos.Neos/Classes/PendingChangesProjection/ChangeProjection.php @@ -36,14 +36,13 @@ use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasTagged; use Neos\ContentRepository\Core\Feature\SubtreeTagging\Event\SubtreeWasUntagged; use Neos\ContentRepository\Core\Feature\WorkspaceCreation\Event\RootWorkspaceWasCreated; -use Neos\ContentRepository\Core\Infrastructure\DbalCheckpointStorage; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaDiff; -use Neos\ContentRepository\Core\Infrastructure\DbalSchemaFactory; -use Neos\ContentRepository\Core\Projection\CheckpointStorageStatusType; use Neos\ContentRepository\Core\Projection\ProjectionInterface; use Neos\ContentRepository\Core\Projection\ProjectionStatus; use Neos\ContentRepository\Core\SharedModel\Node\NodeAggregateId; use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId; +use Neos\ContentRepository\DbalTools\CheckpointHelper; +use Neos\ContentRepository\DbalTools\DbalSchemaDiff; +use Neos\ContentRepository\DbalTools\DbalSchemaFactory; use Neos\EventStore\Model\Event\SequenceNumber; use Neos\EventStore\Model\EventEnvelope; @@ -66,17 +65,11 @@ class ChangeProjection implements ProjectionInterface * @var array|null */ private ?array $liveContentStreamIdsRuntimeCache = null; - private DbalCheckpointStorage $checkpointStorage; public function __construct( private readonly Connection $dbal, private readonly string $tableNamePrefix, ) { - $this->checkpointStorage = new DbalCheckpointStorage( - $this->dbal, - $this->tableNamePrefix . '_checkpoint', - self::class - ); } @@ -85,18 +78,10 @@ public function setUp(): void foreach ($this->determineRequiredSqlStatements() as $statement) { $this->dbal->executeStatement($statement); } - $this->checkpointStorage->setUp(); } public function status(): ProjectionStatus { - $checkpointStorageStatus = $this->checkpointStorage->status(); - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::ERROR) { - return ProjectionStatus::error($checkpointStorageStatus->details); - } - if ($checkpointStorageStatus->type === CheckpointStorageStatusType::SETUP_REQUIRED) { - return ProjectionStatus::setupRequired($checkpointStorageStatus->details); - } try { $this->dbal->connect(); } catch (\Throwable $e) { @@ -110,6 +95,11 @@ public function status(): ProjectionStatus if ($requiredSqlStatements !== []) { return ProjectionStatus::setupRequired(sprintf('The following SQL statement%s required: %s', count($requiredSqlStatements) !== 1 ? 's are' : ' is', implode(chr(10), $requiredSqlStatements))); } + try { + $this->getCheckpoint(); + } catch (\Exception $exception) { + return ProjectionStatus::error('Error while retrieving checkpoint: ' . $exception->getMessage()); + } return ProjectionStatus::ok(); } @@ -118,8 +108,7 @@ public function status(): ProjectionStatus */ private function determineRequiredSqlStatements(): array { - $connection = $this->dbal; - $schemaManager = $connection->getSchemaManager(); + $schemaManager = $this->dbal->getSchemaManager(); if (!$schemaManager instanceof AbstractSchemaManager) { throw new \RuntimeException('Failed to retrieve Schema Manager', 1625653914); } @@ -149,38 +138,22 @@ private function determineRequiredSqlStatements(): array ]); $liveContentStreamsTable->setPrimaryKey(['contentstreamid']); - $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$changeTable, $liveContentStreamsTable]); - return DbalSchemaDiff::determineRequiredSqlStatements($connection, $schema); + $checkpointTable = CheckpointHelper::checkpointTableSchema($this->tableNamePrefix . '_checkpoint'); + + $schema = DbalSchemaFactory::createSchemaWithTables($schemaManager, [$changeTable, $liveContentStreamsTable, $checkpointTable]); + return DbalSchemaDiff::determineRequiredSqlStatements($this->dbal, $schema); } public function reset(): void { $this->dbal->exec('TRUNCATE ' . $this->tableNamePrefix); $this->dbal->exec('TRUNCATE ' . $this->tableNamePrefix . '_livecontentstreams'); - $this->checkpointStorage->acquireLock(); - $this->checkpointStorage->updateAndReleaseLock(SequenceNumber::none()); - } - - public function canHandle(EventInterface $event): bool - { - return in_array($event::class, [ - RootWorkspaceWasCreated::class, - NodeAggregateWasMoved::class, - NodePropertiesWereSet::class, - NodeReferencesWereSet::class, - NodeAggregateWithNodeWasCreated::class, - SubtreeWasTagged::class, - SubtreeWasUntagged::class, - NodeAggregateWasRemoved::class, - DimensionSpacePointWasMoved::class, - NodeGeneralizationVariantWasCreated::class, - NodeSpecializationVariantWasCreated::class, - NodePeerVariantWasCreated::class - ]); + CheckpointHelper::resetCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); } public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void { + $this->dbal->beginTransaction(); match ($event::class) { RootWorkspaceWasCreated::class => $this->whenRootWorkspaceWasCreated($event), NodeAggregateWasMoved::class => $this->whenNodeAggregateWasMoved($event), @@ -194,13 +167,15 @@ public function apply(EventInterface $event, EventEnvelope $eventEnvelope): void NodeSpecializationVariantWasCreated::class => $this->whenNodeSpecializationVariantWasCreated($event), NodeGeneralizationVariantWasCreated::class => $this->whenNodeGeneralizationVariantWasCreated($event), NodePeerVariantWasCreated::class => $this->whenNodePeerVariantWasCreated($event), - default => throw new \InvalidArgumentException(sprintf('Unsupported event %s', get_debug_type($event))), + default => null, }; + CheckpointHelper::updateCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint', $eventEnvelope->sequenceNumber); + $this->dbal->commit(); } - public function getCheckpointStorage(): DbalCheckpointStorage + public function getCheckpoint(): SequenceNumber { - return $this->checkpointStorage; + return CheckpointHelper::getCheckpoint($this->dbal, $this->tableNamePrefix . '_checkpoint'); } public function getState(): ChangeFinder diff --git a/Neos.Neos/composer.json b/Neos.Neos/composer.json index 0f61b1d8764..996a3c0657d 100644 --- a/Neos.Neos/composer.json +++ b/Neos.Neos/composer.json @@ -17,6 +17,7 @@ "neos/media-browser": "self.version", "neos/party": "~7.0.3", "neos/contentrepository-core": "self.version", + "neos/contentrepository-dbaltools": "*", "neos/contentrepositoryregistry": "self.version", "neos/contentrepository-nodeaccess": "self.version", "neos/contentrepository-export": "self.version", diff --git a/composer.json b/composer.json index f6d8da6e73e..69a9f2e8b7d 100644 --- a/composer.json +++ b/composer.json @@ -8,17 +8,17 @@ "require": { "neos/flow-development-collection": "9.0.x-dev", "doctrine/dbal": "^2.8", - "doctrine/migrations": "*", + "php": "^8.2", "neos/eventstore": "^1", "neos/eventstore-doctrineadapter": "^1 || ^2", - "php": "^8.2", "neos/error-messages": "*", "neos/utility-objecthandling": "*", "neos/utility-arrays": "*", "symfony/serializer": "^6.3", "psr/clock": "^1", "behat/transliterator": "~1.0", - "ramsey/uuid": "^3.0 || ^4.0", + "ramsey/uuid": "^3 || ^4", + "symfony/lock": "^6", "league/flysystem": "^3", "webmozart/assert": "^1.11", "neos/flow": "*", @@ -58,6 +58,7 @@ "neos/contentgraph-postgresqladapter": "self.version", "neos/contentrepository-behavioraltests": "self.version", "neos/contentrepository-core": "self.version", + "neos/contentrepository-dbaltools": "self.version", "neos/contentrepository-export": "self.version", "neos/contentrepository-legacynodemigration": "self.version", "neos/contentrepository-nodeaccess": "self.version", @@ -125,7 +126,6 @@ "@test:behat-cli -c Neos.ContentRepository.LegacyNodeMigration/Tests/Behavior/behat.yml.dist", "@test:behat-cli -c Neos.ContentRepository.Export/Tests/Behavior/behat.yml.dist", "@test:behat-cli -c Neos.TimeableNodeVisibility/Tests/Behavior/behat.yml.dist", - "../../flow doctrine:migrate --quiet; ../../flow cr:setup", "@test:behat-cli -c Neos.Neos/Tests/Behavior/behat.yml" ], "test:behavioral:stop-on-failure": [ @@ -134,7 +134,6 @@ "@test:behat-cli -vvv --stop-on-failure -c Neos.ContentRepository.LegacyNodeMigration/Tests/Behavior/behat.yml.dist", "@test:behat-cli -vvv --stop-on-failure -c Neos.ContentRepository.Export/Tests/Behavior/behat.yml.dist", "@test:behat-cli -vvv --stop-on-failure -c Neos.TimeableNodeVisibility/Tests/Behavior/behat.yml.dist", - "../../flow doctrine:migrate --quiet; ../../flow cr:setup", "@test:behat-cli -vvv --stop-on-failure -c Neos.Neos/Tests/Behavior/behat.yml" ], "test": [ @@ -159,6 +158,9 @@ "Neos\\ContentRepository\\Core\\": [ "Neos.ContentRepository.Core/Classes" ], + "Neos\\ContentRepository\\DbalTools\\": [ + "Neos.ContentRepository.DbalTools/Classes" + ], "Neos\\ContentRepository\\Export\\": [ "Neos.ContentRepository.Export/src/" ], @@ -256,6 +258,9 @@ "Neos\\ContentRepository\\Core\\Tests\\": [ "Neos.ContentRepository.Core/Tests" ], + "Neos\\ContentRepository\\DbalTools\\Tests\\": [ + "Neos.ContentRepository.DbalTools/Tests" + ], "Neos\\ContentRepository\\LegacyNodeMigration\\Tests\\": [ "Neos.ContentRepository.LegacyNodeMigration/Tests" ], diff --git a/phpstan.neon.dist b/phpstan.neon.dist index c157d6569f9..9d89d3089f2 100644 --- a/phpstan.neon.dist +++ b/phpstan.neon.dist @@ -9,6 +9,7 @@ parameters: - Neos.ContentGraph.PostgreSQLAdapter/src - Neos.ContentRepository.BehavioralTests/Classes - Neos.ContentRepository.Core/Classes + - Neos.ContentRepository.DbalTools - Neos.ContentRepository.Export/src - Neos.ContentRepository.LegacyNodeMigration/Classes - Neos.ContentRepository.NodeAccess/Classes @@ -16,6 +17,9 @@ parameters: - Neos.ContentRepository.StructureAdjustment/src - Neos.ContentRepository.TestSuite/Classes - Neos.ContentRepositoryRegistry/Classes + - Neos.ContentRepositoryRegistry.DoctrineDbalClient + - Neos.ContentRepositoryRegistry.PostgresDbalClient + - Neos.ContentRepositoryRegistry.TestSuite - Neos.Neos/Classes - Neos.TimeableNodeVisibility/Classes - Neos.NodeTypes.Form/Classes