Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/batch/src/Job/Item/ExpandProcessedItem.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Job\Item;

use ArrayIterator;
use IteratorIterator;

/**
* A processor may return an element of this class
* in order to write multiple items per item read.
*/
final class ExpandProcessedItem extends IteratorIterator
{
public function __construct(iterable $iterator, string $class = '')
{
if (\is_array($iterator)) {
$iterator = new ArrayIterator($iterator);
}
parent::__construct($iterator, $class);
}
}
38 changes: 28 additions & 10 deletions src/batch/src/Job/Item/ItemJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -95,16 +95,18 @@ protected function doExecute(JobExecution $jobExecution): void

$summary->increment('processed');

$itemsToWrite[] = $processedItem;
$writeCount++;

if (0 === $writeCount % $this->batchSize) {
$this->writer->write($itemsToWrite);
$summary->increment('write', $writeCount);
$itemsToWrite = [];
$writeCount = 0;

$this->executionStorage->store($rootExecution);
foreach ($this->getItemsToWrite($processedItem) as $item) {
$itemsToWrite[] = $item;
$writeCount++;

if (0 === $writeCount % $this->batchSize) {
$this->writer->write($itemsToWrite);
$summary->increment('write', $writeCount);
$itemsToWrite = [];
$writeCount = 0;

$this->executionStorage->store($rootExecution);
}
}
}

Expand All @@ -118,6 +120,22 @@ protected function doExecute(JobExecution $jobExecution): void
$this->flushElements();
}

/**
* Analyse processed item to determine the items to write.
*
* @param mixed $processedItem The processed item
*
* @return iterable A list of items to write
*/
protected function getItemsToWrite($processedItem): iterable
{
if ($processedItem instanceof ExpandProcessedItem) {
return $processedItem;
}

return [$processedItem];
}

/**
* Set up elements before execution.
*
Expand Down
23 changes: 23 additions & 0 deletions src/batch/src/Job/Item/Processor/CallbackProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Job\Item\Processor;

use Closure;
use Yokai\Batch\Job\Item\ItemProcessorInterface;

final class CallbackProcessor implements ItemProcessorInterface
{
private Closure $callback;

public function __construct(Closure $callback)
{
$this->callback = $callback;
}

public function process($item)
{
return ($this->callback)($item);
}
}
30 changes: 30 additions & 0 deletions src/batch/src/Test/Job/Item/Writer/InMemoryWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Test\Job\Item\Writer;

use Yokai\Batch\Job\Item\InitializableInterface;
use Yokai\Batch\Job\Item\ItemWriterInterface;

final class InMemoryWriter implements ItemWriterInterface, InitializableInterface
{
private array $items = [];

public function initialize(): void
{
$this->items = [];
}

public function write(iterable $items): void
{
foreach ($items as $item) {
$this->items[] = $item;
}
}

public function getItems(): array
{
return $this->items;
}
}
32 changes: 32 additions & 0 deletions src/batch/tests/Job/Item/ItemJobTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
use Prophecy\Prophecy\ObjectProphecy;
use Yokai\Batch\Job\Item\ExpandProcessedItem;
use Yokai\Batch\Job\Item\FlushableInterface;
use Yokai\Batch\Job\Item\InitializableInterface;
use Yokai\Batch\Job\Item\InvalidItemException;
use Yokai\Batch\Job\Item\ItemJob;
use Yokai\Batch\Job\Item\ItemProcessorInterface;
use Yokai\Batch\Job\Item\ItemReaderInterface;
use Yokai\Batch\Job\Item\ItemWriterInterface;
use Yokai\Batch\Job\Item\Processor\CallbackProcessor;
use Yokai\Batch\Job\Item\Reader\StaticIterableReader;
use Yokai\Batch\Job\JobExecutionAwareInterface;
use Yokai\Batch\Job\JobParametersAwareInterface;
use Yokai\Batch\Job\SummaryAwareInterface;
use Yokai\Batch\JobExecution;
use Yokai\Batch\JobParameters;
use Yokai\Batch\Storage\NullJobExecutionStorage;
use Yokai\Batch\Summary;
use Yokai\Batch\Test\Job\Item\Writer\InMemoryWriter;
use Yokai\Batch\Tests\Util;

class ItemJobTest extends TestCase
Expand Down Expand Up @@ -104,6 +108,34 @@ public function testExecute(): void
self::assertEquals($expectedLogs, $log);
}

public function testWithExpandItem(): void
{
$job = new ItemJob(
4,
new StaticIterableReader(['eggplant', 'tomato', 'avocado']),
new CallbackProcessor(fn($item) => new ExpandProcessedItem(['fruit:' . $item, 'vegetable:' . $item])),
$writer = new InMemoryWriter(),
new NullJobExecutionStorage()
);

$job->execute($execution = JobExecution::createRoot('123456', 'testing'));

self::assertSame(
[
'fruit:eggplant',
'vegetable:eggplant',
'fruit:tomato',
'vegetable:tomato',
'fruit:avocado',
'vegetable:avocado',
],
$writer->getItems()
);
self::assertSame(3, $execution->getSummary()->get('read'));
self::assertSame(3, $execution->getSummary()->get('processed'));
self::assertSame(6, $execution->getSummary()->get('write'));
}

private function configureItemElement(ObjectProphecy $element, string $role, string &$log): void
{
/** @var ObjectProphecy|JobExecutionAwareInterface|JobParametersAwareInterface|SummaryAwareInterface|InitializableInterface|FlushableInterface $element */
Expand Down
19 changes: 19 additions & 0 deletions src/batch/tests/Job/Item/Processor/CallbackProcessorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Job\Item\Processor;

use Yokai\Batch\Job\Item\Processor\CallbackProcessor;
use PHPUnit\Framework\TestCase;

class CallbackProcessorTest extends TestCase
{
public function testProcess(): void
{
$processor = new CallbackProcessor(fn($item) => \mb_strtolower($item));

self::assertSame('john', $processor->process('John'));
self::assertSame('doe', $processor->process('DOE'));
}
}
26 changes: 26 additions & 0 deletions src/batch/tests/Test/Job/Item/Writer/InMemoryWriterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Yokai\Batch\Tests\Test\Job\Item\Writer;

use Yokai\Batch\Test\Job\Item\Writer\InMemoryWriter;
use PHPUnit\Framework\TestCase;

class InMemoryWriterTest extends TestCase
{
public function test(): void
{
$writer = new InMemoryWriter();
self::assertSame([], $writer->getItems());
$writer->initialize();
self::assertSame([], $writer->getItems());
$writer->write([1, 2, 3]);
self::assertSame([1, 2, 3], $writer->getItems());
$writer->write([4, 5, 6]);
self::assertSame([1, 2, 3, 4, 5, 6], $writer->getItems());
$writer->initialize();
$writer->write([7, 8, 9]);
self::assertSame([7, 8, 9], $writer->getItems());
}
}