From c6c0be4048add3564343a6e66a6beef21133181e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yann=20Eugon=C3=A9?= Date: Sun, 11 Jul 2021 12:27:00 +0200 Subject: [PATCH 1/3] Introduce CallbackProcessor --- .../Job/Item/Processor/CallbackProcessor.php | 23 +++++++++++++++++++ .../Item/Processor/CallbackProcessorTest.php | 19 +++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 src/batch/src/Job/Item/Processor/CallbackProcessor.php create mode 100644 src/batch/tests/Job/Item/Processor/CallbackProcessorTest.php diff --git a/src/batch/src/Job/Item/Processor/CallbackProcessor.php b/src/batch/src/Job/Item/Processor/CallbackProcessor.php new file mode 100644 index 00000000..c5277262 --- /dev/null +++ b/src/batch/src/Job/Item/Processor/CallbackProcessor.php @@ -0,0 +1,23 @@ +callback = $callback; + } + + public function process($item) + { + return ($this->callback)($item); + } +} diff --git a/src/batch/tests/Job/Item/Processor/CallbackProcessorTest.php b/src/batch/tests/Job/Item/Processor/CallbackProcessorTest.php new file mode 100644 index 00000000..e676a497 --- /dev/null +++ b/src/batch/tests/Job/Item/Processor/CallbackProcessorTest.php @@ -0,0 +1,19 @@ + \mb_strtolower($item)); + + self::assertSame('john', $processor->process('John')); + self::assertSame('doe', $processor->process('DOE')); + } +} From 42839cc0c8f71ba673aaf9075142fbe69517ba72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yann=20Eugon=C3=A9?= Date: Sun, 11 Jul 2021 12:27:32 +0200 Subject: [PATCH 2/3] Introduce InMemoryWriter for testing purpose --- .../Test/Job/Item/Writer/InMemoryWriter.php | 30 +++++++++++++++++++ .../Job/Item/Writer/InMemoryWriterTest.php | 26 ++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 src/batch/src/Test/Job/Item/Writer/InMemoryWriter.php create mode 100644 src/batch/tests/Test/Job/Item/Writer/InMemoryWriterTest.php diff --git a/src/batch/src/Test/Job/Item/Writer/InMemoryWriter.php b/src/batch/src/Test/Job/Item/Writer/InMemoryWriter.php new file mode 100644 index 00000000..184660ae --- /dev/null +++ b/src/batch/src/Test/Job/Item/Writer/InMemoryWriter.php @@ -0,0 +1,30 @@ +items = []; + } + + public function write(iterable $items): void + { + foreach ($items as $item) { + $this->items[] = $item; + } + } + + public function getItems(): array + { + return $this->items; + } +} diff --git a/src/batch/tests/Test/Job/Item/Writer/InMemoryWriterTest.php b/src/batch/tests/Test/Job/Item/Writer/InMemoryWriterTest.php new file mode 100644 index 00000000..17940436 --- /dev/null +++ b/src/batch/tests/Test/Job/Item/Writer/InMemoryWriterTest.php @@ -0,0 +1,26 @@ +getItems()); + $writer->initialize(); + self::assertSame([], $writer->getItems()); + $writer->write([1, 2, 3]); + self::assertSame([1, 2, 3], $writer->getItems()); + $writer->write([4, 5, 6]); + self::assertSame([1, 2, 3, 4, 5, 6], $writer->getItems()); + $writer->initialize(); + $writer->write([7, 8, 9]); + self::assertSame([7, 8, 9], $writer->getItems()); + } +} From 85af371f123aabfb9457d0b9710c120bbe3ec348 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yann=20Eugon=C3=A9?= Date: Sun, 11 Jul 2021 12:31:09 +0200 Subject: [PATCH 3/3] Introduce ExpandProcessedItem class in order to allow writing multiple items for a read item --- .../src/Job/Item/ExpandProcessedItem.php | 23 +++++++++++ src/batch/src/Job/Item/ItemJob.php | 38 ++++++++++++++----- src/batch/tests/Job/Item/ItemJobTest.php | 32 ++++++++++++++++ 3 files changed, 83 insertions(+), 10 deletions(-) create mode 100644 src/batch/src/Job/Item/ExpandProcessedItem.php diff --git a/src/batch/src/Job/Item/ExpandProcessedItem.php b/src/batch/src/Job/Item/ExpandProcessedItem.php new file mode 100644 index 00000000..df6e9b16 --- /dev/null +++ b/src/batch/src/Job/Item/ExpandProcessedItem.php @@ -0,0 +1,23 @@ +increment('processed'); - $itemsToWrite[] = $processedItem; - $writeCount++; - - if (0 === $writeCount % $this->batchSize) { - $this->writer->write($itemsToWrite); - $summary->increment('write', $writeCount); - $itemsToWrite = []; - $writeCount = 0; - - $this->executionStorage->store($rootExecution); + foreach ($this->getItemsToWrite($processedItem) as $item) { + $itemsToWrite[] = $item; + $writeCount++; + + if (0 === $writeCount % $this->batchSize) { + $this->writer->write($itemsToWrite); + $summary->increment('write', $writeCount); + $itemsToWrite = []; + $writeCount = 0; + + $this->executionStorage->store($rootExecution); + } } } @@ -118,6 +120,22 @@ protected function doExecute(JobExecution $jobExecution): void $this->flushElements(); } + /** + * Analyse processed item to determine the items to write. + * + * @param mixed $processedItem The processed item + * + * @return iterable A list of items to write + */ + protected function getItemsToWrite($processedItem): iterable + { + if ($processedItem instanceof ExpandProcessedItem) { + return $processedItem; + } + + return [$processedItem]; + } + /** * Set up elements before execution. * diff --git a/src/batch/tests/Job/Item/ItemJobTest.php b/src/batch/tests/Job/Item/ItemJobTest.php index 9317e662..140fe32a 100644 --- a/src/batch/tests/Job/Item/ItemJobTest.php +++ b/src/batch/tests/Job/Item/ItemJobTest.php @@ -9,6 +9,7 @@ use Prophecy\Argument; use Prophecy\PhpUnit\ProphecyTrait; use Prophecy\Prophecy\ObjectProphecy; +use Yokai\Batch\Job\Item\ExpandProcessedItem; use Yokai\Batch\Job\Item\FlushableInterface; use Yokai\Batch\Job\Item\InitializableInterface; use Yokai\Batch\Job\Item\InvalidItemException; @@ -16,6 +17,8 @@ use Yokai\Batch\Job\Item\ItemProcessorInterface; use Yokai\Batch\Job\Item\ItemReaderInterface; use Yokai\Batch\Job\Item\ItemWriterInterface; +use Yokai\Batch\Job\Item\Processor\CallbackProcessor; +use Yokai\Batch\Job\Item\Reader\StaticIterableReader; use Yokai\Batch\Job\JobExecutionAwareInterface; use Yokai\Batch\Job\JobParametersAwareInterface; use Yokai\Batch\Job\SummaryAwareInterface; @@ -23,6 +26,7 @@ use Yokai\Batch\JobParameters; use Yokai\Batch\Storage\NullJobExecutionStorage; use Yokai\Batch\Summary; +use Yokai\Batch\Test\Job\Item\Writer\InMemoryWriter; use Yokai\Batch\Tests\Util; class ItemJobTest extends TestCase @@ -104,6 +108,34 @@ public function testExecute(): void self::assertEquals($expectedLogs, $log); } + public function testWithExpandItem(): void + { + $job = new ItemJob( + 4, + new StaticIterableReader(['eggplant', 'tomato', 'avocado']), + new CallbackProcessor(fn($item) => new ExpandProcessedItem(['fruit:' . $item, 'vegetable:' . $item])), + $writer = new InMemoryWriter(), + new NullJobExecutionStorage() + ); + + $job->execute($execution = JobExecution::createRoot('123456', 'testing')); + + self::assertSame( + [ + 'fruit:eggplant', + 'vegetable:eggplant', + 'fruit:tomato', + 'vegetable:tomato', + 'fruit:avocado', + 'vegetable:avocado', + ], + $writer->getItems() + ); + self::assertSame(3, $execution->getSummary()->get('read')); + self::assertSame(3, $execution->getSummary()->get('processed')); + self::assertSame(6, $execution->getSummary()->get('write')); + } + private function configureItemElement(ObjectProphecy $element, string $role, string &$log): void { /** @var ObjectProphecy|JobExecutionAwareInterface|JobParametersAwareInterface|SummaryAwareInterface|InitializableInterface|FlushableInterface $element */