From 228653e3dc36b3bdbda602c5e264f3756d87c3b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Timothe=CC=81e=20Barray?= Date: Wed, 28 Aug 2013 10:58:57 +0200 Subject: [PATCH] Initial src --- .atoum.bootstrap.php | 2 + .atoum.php | 17 ++ .gitignore | 5 + README.md | 23 +- composer.json | 44 ++++ examples/basic.php | 50 +++++ examples/fixtures.csv | 3 + src/Rezzza/JobFlow/AbstractJobType.php | 43 ++++ .../JobFlow/Extension/BaseExtension.php | 137 ++++++++++++ .../JobFlow/Extension/Core/CoreExtension.php | 28 +++ .../JobFlow/Extension/Core/Type/ETLType.php | 33 +++ .../Core/Type/Extractor/ExtractorType.php | 49 +++++ .../JobFlow/Extension/Core/Type/JobType.php | 42 ++++ .../Extension/Core/Type/Loader/LoaderType.php | 56 +++++ .../Core/Type/Transformer/TransformerType.php | 61 ++++++ .../Extension/Core/Wrapper/CsvWrapper.php | 22 ++ .../Extension/Core/Wrapper/JobWrapper.php | 13 ++ .../Extension/Core/Wrapper/JsonWrapper.php | 20 ++ .../Extension/Core/Wrapper/TsvWrapper.php | 13 ++ .../Extension/JobExtensionInterface.php | 17 ++ src/Rezzza/JobFlow/Io/AbstractStream.php | 57 +++++ src/Rezzza/JobFlow/Io/Input.php | 22 ++ src/Rezzza/JobFlow/Io/IoDescriptor.php | 36 +++ src/Rezzza/JobFlow/Io/IoResolver.php | 57 +++++ src/Rezzza/JobFlow/Io/IoWrapperInterface.php | 8 + src/Rezzza/JobFlow/Io/Output.php | 11 + src/Rezzza/JobFlow/Job.php | 148 +++++++++++++ src/Rezzza/JobFlow/JobBuilder.php | 167 ++++++++++++++ src/Rezzza/JobFlow/JobContext.php | 95 ++++++++ src/Rezzza/JobFlow/JobContextInterface.php | 11 + src/Rezzza/JobFlow/JobFactory.php | 114 ++++++++++ src/Rezzza/JobFlow/JobFactoryBuilder.php | 115 ++++++++++ src/Rezzza/JobFlow/JobInterface.php | 11 + src/Rezzza/JobFlow/JobMessage.php | 35 +++ src/Rezzza/JobFlow/JobRegistry.php | 84 +++++++ src/Rezzza/JobFlow/JobStep.php | 16 ++ src/Rezzza/JobFlow/JobTypeInterface.php | 36 +++ src/Rezzza/JobFlow/Jobs.php | 38 ++++ src/Rezzza/JobFlow/ResolvedJob.php | 145 +++++++++++++ src/Rezzza/JobFlow/Resources/readme.md | 7 + .../JobFlow/Scheduler/ExecutionContext.php | 195 +++++++++++++++++ src/Rezzza/JobFlow/Scheduler/JobGraph.php | 71 ++++++ src/Rezzza/JobFlow/Scheduler/JobScheduler.php | 205 ++++++++++++++++++ .../Scheduler/Strategy/DirectStrategy.php | 8 + .../Scheduler/Strategy/StrategyInterface.php | 8 + .../Scheduler/Transport/AbstractTransport.php | 23 ++ .../Scheduler/Transport/PhpTransport.php | 15 ++ .../Scheduler/Transport/RabbitMqTransport.php | 30 +++ .../Transport/TransportInterface.php | 10 + tests/unit/Job.php | 37 ++++ 50 files changed, 2491 insertions(+), 2 deletions(-) create mode 100644 .atoum.bootstrap.php create mode 100644 .atoum.php create mode 100644 .gitignore create mode 100644 composer.json create mode 100644 examples/basic.php create mode 100644 examples/fixtures.csv create mode 100644 src/Rezzza/JobFlow/AbstractJobType.php create mode 100644 src/Rezzza/JobFlow/Extension/BaseExtension.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/CoreExtension.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Type/ETLType.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Type/Extractor/ExtractorType.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Type/JobType.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Type/Loader/LoaderType.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Type/Transformer/TransformerType.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Wrapper/CsvWrapper.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Wrapper/JobWrapper.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Wrapper/JsonWrapper.php create mode 100644 src/Rezzza/JobFlow/Extension/Core/Wrapper/TsvWrapper.php create mode 100644 src/Rezzza/JobFlow/Extension/JobExtensionInterface.php create mode 100644 src/Rezzza/JobFlow/Io/AbstractStream.php create mode 100644 src/Rezzza/JobFlow/Io/Input.php create mode 100644 src/Rezzza/JobFlow/Io/IoDescriptor.php create mode 100644 src/Rezzza/JobFlow/Io/IoResolver.php create mode 100644 src/Rezzza/JobFlow/Io/IoWrapperInterface.php create mode 100644 src/Rezzza/JobFlow/Io/Output.php create mode 100644 src/Rezzza/JobFlow/Job.php create mode 100644 src/Rezzza/JobFlow/JobBuilder.php create mode 100644 src/Rezzza/JobFlow/JobContext.php create mode 100644 src/Rezzza/JobFlow/JobContextInterface.php create mode 100644 src/Rezzza/JobFlow/JobFactory.php create mode 100644 src/Rezzza/JobFlow/JobFactoryBuilder.php create mode 100644 src/Rezzza/JobFlow/JobInterface.php create mode 100644 src/Rezzza/JobFlow/JobMessage.php create mode 100644 src/Rezzza/JobFlow/JobRegistry.php create mode 100644 src/Rezzza/JobFlow/JobStep.php create mode 100644 src/Rezzza/JobFlow/JobTypeInterface.php create mode 100644 src/Rezzza/JobFlow/Jobs.php create mode 100644 src/Rezzza/JobFlow/ResolvedJob.php create mode 100644 src/Rezzza/JobFlow/Resources/readme.md create mode 100644 src/Rezzza/JobFlow/Scheduler/ExecutionContext.php create mode 100644 src/Rezzza/JobFlow/Scheduler/JobGraph.php create mode 100644 src/Rezzza/JobFlow/Scheduler/JobScheduler.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Strategy/DirectStrategy.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Strategy/StrategyInterface.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Transport/AbstractTransport.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Transport/PhpTransport.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Transport/RabbitMqTransport.php create mode 100644 src/Rezzza/JobFlow/Scheduler/Transport/TransportInterface.php create mode 100644 tests/unit/Job.php diff --git a/.atoum.bootstrap.php b/.atoum.bootstrap.php new file mode 100644 index 0000000..4a2fec1 --- /dev/null +++ b/.atoum.bootstrap.php @@ -0,0 +1,2 @@ +addTestAllDirectory(__DIR__.'/tests/unit'); + +$script + ->addDefaultReport() + ->addField(new atoum\report\fields\runner\result\logo()) + ->addField(new atoum\report\fields\runner\coverage\html( + 'Code coverage', + __DIR__.'/web/code-coverage' + ) + ) +; + +$script->noCodeCoverageForNamespaces(array('mageekguy', 'symfony')); +$script->bootstrapFile(__DIR__ . DIRECTORY_SEPARATOR . '.atoum.bootstrap.php'); diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..681be19 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +bin +composer.lock +vendor +web +examples/temp \ No newline at end of file diff --git a/README.md b/README.md index 39af5d3..0938f8e 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,23 @@ -jobflow +JobFlow ======= -Simpler, Easier, Faster +Makes batch jobs creation Simpler, Easier, Faster. + +ETL pattern support thanks to : https://github.com/docteurklein/php-etl + +**Warning** : This code has not been executed in production. + +Usage +----- + +Have a look to the [first example](/examples/basic.php) + +Tests +----- + +Units tests powered by atoum : https://github.com/atoum/atoum + +Credits +------- + +Builder Architecture heavily inspired by Symfony Form Component : https://github.com/symfony/Form \ No newline at end of file diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..cd38aed --- /dev/null +++ b/composer.json @@ -0,0 +1,44 @@ +{ + "name": "rezzza/jobflow", + "description": "Makes batch job creation easier", + "keywords": ["batch", "job", "etl"], + "type": "standalone", + "homepage": "https://github.com/rezzza/jobflow", + "license": "MIT", + "authors": [{ + "name": "Community contributors", + "homepage": "https://github.com/rezzza/jobflow/contributors" + }], + "require": { + "php": ">=5.4.0", + + "symfony/options-resolver": "~2.3", + "knplabs/etl": "dev-waiting" + }, + "suggest": { + "doctrine/orm": "To use ORM loader" + }, + "require-dev": { + "atoum/atoum": "*@dev", + "monolog/monolog": "*" + }, + "autoload": { + "psr-0": { + "Rezzza\\JobFlow": "src/" + } + }, + "extra": { + "branch-alias": { + "dev-master": "1.0.x-dev" + } + }, + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/tyx/php-etl" + } + ], + "config": { + "bin-dir": "bin" + } +} diff --git a/examples/basic.php b/examples/basic.php new file mode 100644 index 0000000..3db132b --- /dev/null +++ b/examples/basic.php @@ -0,0 +1,50 @@ +setLogger($logger); + +$io = new Io\IoDescriptor( + new Io\Input('file://'.__DIR__.'/fixtures.csv'), + new Io\Output('file:///'.__DIR__.'/temp/result.json') +); + +$job = $jobFactory->createBuilder('job', $io) + ->add( + 'example_extractor', + 'extractor' + ) + ->add( + 'example_transformer', + 'transformer', + array( + 'transformer_callback' => function($data, $target) { + $target['firstname'] = $data[0]; + $target['name'] = $data[1]; + $target['url'] = sprintf('http://www.lequipe.fr/Football/FootballFicheJoueur%s.html', $data[2]); + + return json_encode($target, JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES); + } + ) + ) + ->add( + 'example_loader', + 'loader' + ) + ->getJob() +; + +$scheduler + ->setJob($job) + ->init() + ->run() +; \ No newline at end of file diff --git a/examples/fixtures.csv b/examples/fixtures.csv new file mode 100644 index 0000000..4055b76 --- /dev/null +++ b/examples/fixtures.csv @@ -0,0 +1,3 @@ +Zinedine;Zidane;4112 +Michel;Platini;9663 +Raymond;Kopa;13179 \ No newline at end of file diff --git a/src/Rezzza/JobFlow/AbstractJobType.php b/src/Rezzza/JobFlow/AbstractJobType.php new file mode 100644 index 0000000..b49a52e --- /dev/null +++ b/src/Rezzza/JobFlow/AbstractJobType.php @@ -0,0 +1,43 @@ + + */ +abstract class AbstractJobType implements JobTypeInterface +{ + /** + * {@inheritdoc} + */ + public function execute($input, ExecutionContext $execution) + { + } + + /** + * {@inheritdoc} + */ + public function buildJob(JobBuilder $builder, array $options) + { + } + + /** + * {@inheritdoc} + */ + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + } + + /** + * {@inheritdoc} + */ + public function getParent() + { + return 'job'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/BaseExtension.php b/src/Rezzza/JobFlow/Extension/BaseExtension.php new file mode 100644 index 0000000..ccc5589 --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/BaseExtension.php @@ -0,0 +1,137 @@ +types[$type->getName()] = $type; + } + + /** + * {@inheritdoc} + */ + public function getType($name) + { + if (null === $this->types) { + $this->initTypes(); + } + + if (!isset($this->types[$name])) { + throw new \InvalidArgumentException(sprintf('The type "%s" can not be loaded by this extension', $name)); + } + + return $this->types[$name]; + } + + /** + * {@inheritdoc} + */ + public function hasType($name) + { + if (null === $this->types) { + $this->initTypes(); + } + + return isset($this->types[$name]); + } + + public function addWrapper(IoWrapperInterface $wrapper) + { + $this->wrappers[$wrapper->getName()] = $wrapper; + } + + /** + * {@inheritdoc} + */ + public function getWrapper($name) + { + if (null === $this->wrappers) { + $this->initWrappers(); + } + + if (!isset($this->wrappers[$name])) { + throw new \InvalidArgumentException(sprintf('The wrapper "%s" can not be loaded by this extension', $name)); + } + + return $this->wrappers[$name]; + } + + /** + * {@inheritdoc} + */ + public function hasWrapper($name) + { + if (null === $this->wrappers) { + $this->initWrappers(); + } + + return isset($this->wrappers[$name]); + } + + /** + * Registers the types. + * + * @return JobTypeInterface[] + */ + protected function loadTypes() + { + return array(); + } + + /** + * Registers the wrappers. + * + * @return IoWrapperInterface[] + */ + protected function loadWrappers() + { + return array(); + } + + /** + * Initializes the types. + */ + private function initTypes() + { + $this->types = array(); + + foreach ($this->loadTypes() as $type) { + if (!$type instanceof JobTypeInterface) { + throw new \InvalidArgumentException(sprintf('Type %s should implements JobTypeInterface', get_class($type))); + } + + $this->types[$type->getName()] = $type; + } + } + + /** + * Initializes the types. + */ + private function initWrappers() + { + $this->wrappers = array(); + + foreach ($this->loadWrappers() as $wrapper) { + if (!$wrapper instanceof IoWrapperInterface) { + throw new \InvalidArgumentException(sprintf('Wrapper %s should implements IoWrapperInterface', get_class($wrapper))); + } + + $this->wrappers[$wrapper->getName()] = $wrapper; + } + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/CoreExtension.php b/src/Rezzza/JobFlow/Extension/Core/CoreExtension.php new file mode 100644 index 0000000..48b25ab --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/CoreExtension.php @@ -0,0 +1,28 @@ + + */ +abstract class ETLType extends AbstractJobType +{ + abstract function getETLType(); + + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $resolver->setDefaults(array( + 'etl_type' => $this->getETLType() + )); + } + + protected function isLoggable($object) + { + if (!is_object($object)) { + return false; + } + + return in_array('Psr\Log\LoggerAwareTrait', class_uses(get_class($object))); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Type/Extractor/ExtractorType.php b/src/Rezzza/JobFlow/Extension/Core/Type/Extractor/ExtractorType.php new file mode 100644 index 0000000..b5bea0f --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Type/Extractor/ExtractorType.php @@ -0,0 +1,49 @@ +isLoggable($input)) { + $input->setLogger($execution->getLogger()); + } + + $execution->setGlobalOption('total', $input->count()); + + $offset = $execution->getOption('offset'); + $limit = $execution->getOption('limit'); + $results = array(); + + $input->seek($offset); + $etl = new ETL\Context\Context(); + + // Skip Header + if ($offset === 0) { + $input->extract($etl); + } + + for ($i = 0; $i < $limit && $input->valid(); $i++) { + $results[] = $input->current(); + $input->next(); + } + + return $results; + } + + public function getName() + { + return 'extractor'; + } + + public function getETLType() + { + return 'extractor'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Type/JobType.php b/src/Rezzza/JobFlow/Extension/Core/Type/JobType.php new file mode 100644 index 0000000..449b79f --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Type/JobType.php @@ -0,0 +1,42 @@ + + */ +class JobType extends AbstractJobType +{ + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $resolver->setOptional(array( + 'io' + )); + + $resolver->setAllowedTypes(array( + 'io' => 'Rezzza\JobFlow\Io\IoDescriptor', + )); + } + + /** + * {@inheritdoc} + */ + public function getParent() + { + return null; + } + + /** + * {@inheritdoc} + */ + public function getName() + { + return 'job'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Type/Loader/LoaderType.php b/src/Rezzza/JobFlow/Extension/Core/Type/Loader/LoaderType.php new file mode 100644 index 0000000..968d2c4 --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Type/Loader/LoaderType.php @@ -0,0 +1,56 @@ +loader = $options['loader']; + } + + public function execute($input, ExecutionContext $execution) + { + if ($this->isLoggable($this->loader)) { + $this->loader->setLogger($execution->getLogger()); + } + + foreach ($input as $d) { + $this->loader->load($d, new ETL\Context\Context()); + } + + return []; // End chain should return empty array + } + + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $type = $this; + + $resolver->setDefaults(array( + 'class' => 'Knp\ETL\Loader\FileLoader', + 'loader' => function(Options $options) use ($type) { + $class = $options['class']; + + return new $class($options['io']->stdout->getDsn()); + } + )); + } + + public function getName() + { + return 'loader'; + } + + public function getETLType() + { + return 'loader'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Type/Transformer/TransformerType.php b/src/Rezzza/JobFlow/Extension/Core/Type/Transformer/TransformerType.php new file mode 100644 index 0000000..6295ccc --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Type/Transformer/TransformerType.php @@ -0,0 +1,61 @@ +transformer = new $class($callback); + } + + public function execute($input, ExecutionContext $execution) + { + $results = []; + + foreach ($input as $k => $result) { + $execution->getLogger()->debug('transformation '.$k); + $etlContext = new ETL\Context\Context(); + + $results[] = $this->transformer->transform($result, $etlContext); + } + + return $results; + } + + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $resolver->setDefaults(array( + 'transformer' => 'Knp\ETL\Transformer\CallbackTransformer' + )); + + $resolver->setRequired(array( + 'transformer_callback' + )); + + $resolver->setAllowedTypes(array( + 'transformer_callback' => 'callable' + )); + } + + public function getName() + { + return 'transformer'; + } + + public function getETLType() + { + return 'transformer'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Wrapper/CsvWrapper.php b/src/Rezzza/JobFlow/Extension/Core/Wrapper/CsvWrapper.php new file mode 100644 index 0000000..68dbd2a --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Wrapper/CsvWrapper.php @@ -0,0 +1,22 @@ +delimiter); + } + + public function getName() + { + return 'csv'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Extension/Core/Wrapper/JobWrapper.php b/src/Rezzza/JobFlow/Extension/Core/Wrapper/JobWrapper.php new file mode 100644 index 0000000..ebb4383 --- /dev/null +++ b/src/Rezzza/JobFlow/Extension/Core/Wrapper/JobWrapper.php @@ -0,0 +1,13 @@ + + */ +interface JobExtensionInterface +{ + public function getType($name); + + public function hasType($name); + + public function getWrapper($name); + + public function hasWrapper($name); +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Io/AbstractStream.php b/src/Rezzza/JobFlow/Io/AbstractStream.php new file mode 100644 index 0000000..e3ff16d --- /dev/null +++ b/src/Rezzza/JobFlow/Io/AbstractStream.php @@ -0,0 +1,57 @@ +fileInfo = new \SplFileInfo($dsn); + $this->dsn = $dsn; + $this->wrapper = $wrapper; + $this->format = $format ?: $this->fileInfo->getExtension(); + } + + public function getDsn() + { + return $this->dsn; + } + + public function getFormat() + { + return $this->format; + } + + public function getWrapper() + { + if (null === $this->wrapper) { + throw new \RuntimeException(sprintf('No wrapper defined for stream with dsn : "%s"', $this->dsn)); + } + + return $this->wrapper; + } + + public function setWrapper($wrapper) + { + $this->wrapper = $wrapper; + $this->resolved = true; + } + + public function isConnectedTo($name) + { + // path get the started slash, so substr is needed + return substr($this->parts['path'], 1) === $name; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Io/Input.php b/src/Rezzza/JobFlow/Io/Input.php new file mode 100644 index 0000000..1135e7f --- /dev/null +++ b/src/Rezzza/JobFlow/Io/Input.php @@ -0,0 +1,22 @@ + + */ +class Input extends AbstractStream +{ + public function read() + { + return $this->getWrapper()->read($this->parts['path']); + } + + /** + * All input except the first one should have a job scheme + */ + public function isFirstStep() + { + return $this->parts['scheme'] !== 'job'; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Io/IoDescriptor.php b/src/Rezzza/JobFlow/Io/IoDescriptor.php new file mode 100644 index 0000000..6e9a3d9 --- /dev/null +++ b/src/Rezzza/JobFlow/Io/IoDescriptor.php @@ -0,0 +1,36 @@ +stdin = $stdin; + $this->stdout = $stdout; + $this->stderr = $stderr; + } + + public function read() + { + return $this->stdin->read(); + } + + public function write() + { + return $this->stdout->write(); + } + + public function getIterator() + { + return new \ArrayIterator($this); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Io/IoResolver.php b/src/Rezzza/JobFlow/Io/IoResolver.php new file mode 100644 index 0000000..8dc90d4 --- /dev/null +++ b/src/Rezzza/JobFlow/Io/IoResolver.php @@ -0,0 +1,57 @@ +registry = $registry; + } + + public function resolve($descriptor) + { + if (null === $descriptor) { + return null; + } + + foreach ($descriptor as $stream) { + if (!$stream instanceof AbstractStream) { + continue; + } + + $dsn = parse_url($stream->getDsn()); + + if (false === $dsn) { + throw new \RuntimeException(sprintf('Cannot parse dsn : %s', $stream->getDsn())); + } + + if (!isset($dsn['scheme'])) { + throw new \RuntimeException(sprintf('dsn "%s" should follow standard format "scheme://host/path"', $stream->getDsn())); + } + + $protocol = $dsn['scheme']; + $phpSupported = $this->isPhpSupported($protocol); + + if (false === $phpSupported) { + $wrapper = $this->registry->getWrapper($protocol); + } else { + $wrapper = $this->registry->getWrapper($stream->getFormat()); + } + + $stream->setWrapper($wrapper); + $stream->parts = $dsn; + } + + return $descriptor; + } + + public function isPhpSupported($protocol) + { + return in_array($protocol, stream_get_wrappers()); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Io/IoWrapperInterface.php b/src/Rezzza/JobFlow/Io/IoWrapperInterface.php new file mode 100644 index 0000000..87531dc --- /dev/null +++ b/src/Rezzza/JobFlow/Io/IoWrapperInterface.php @@ -0,0 +1,8 @@ +wrapper->getOutput($this->parts['path']); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Job.php b/src/Rezzza/JobFlow/Job.php new file mode 100644 index 0000000..732260f --- /dev/null +++ b/src/Rezzza/JobFlow/Job.php @@ -0,0 +1,148 @@ + + */ +class Job implements \IteratorAggregate, JobInterface +{ + /** + * @var JobInterface[] + */ + protected $children = array(); + + /** + * @var string + */ + protected $name; + + /** + * Contains Job Service and IoDescriptor. Helps to build Job easier + * + * @var ResolvedJob + */ + protected $resolved; + + /** + * @var array + */ + protected $options; + + /** + * When a job is processed by JobScheduler we need to ensure it will not change + * + * @var boolean + */ + protected $locked = false; + + /** + * @var string $name + * @var ResolvedJob $resolved + * @var array $options + */ + public function __construct($name, ResolvedJob $resolved, array $options = array()) + { + $this->name = $name; + $this->resolved = $resolved; + $this->options = $options; + } + + /** + * @param JobInterface $child + */ + public function add(JobInterface $child) + { + if ($this->isLocked()) { + throw new \RuntimeException('Cannot add child on job locked'); + } + + $this->children[$child->getName()] = $child; + } + + /** + * @return JobInterface + */ + public function get($name) + { + if (!array_key_exists($name, $this->children)) { + throw new \LogicException(sprintf('No child with name : "%s" in job "%s"', $name, $this->name)); + } + + return $this->children[$name]; + } + + /** + * @return array + */ + public function getOptions() + { + return $this->options; + } + + /** + * @return ResolvedJob + */ + public function getResolved() + { + return $this->resolved; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @return JobInterface[] + */ + public function getChildren() + { + return $this->children; + } + + /** + * @param boolean $locked + */ + public function setLocked($locked) + { + $this->locked = $locked; + } + + /** + * @return boolean + */ + public function isLocked() + { + return $this->locked; + } + + public function execute($input, $context) + { + if ($this->isLocked()) { + throw new \RuntimeException('Cannot execute job not locked'); + } + + return $this->getResolved()->execute($input, $context); + } + + /** + * Returns the iterator for this job. + * + * @return \RecursiveArrayIterator + */ + public function getIterator() + { + return new \RecursiveArrayIterator($this->children); + } + + public function __toString() + { + return $this->getName(); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobBuilder.php b/src/Rezzza/JobFlow/JobBuilder.php new file mode 100644 index 0000000..da4d92f --- /dev/null +++ b/src/Rezzza/JobFlow/JobBuilder.php @@ -0,0 +1,167 @@ + + */ +class JobBuilder +{ + /** + * @var string + */ + protected $name; + + /** + * @var array + */ + protected $children = array(); + + /** + * @var array + */ + protected $unresolvedChildren = array(); + + /** + * @var JobFactory + */ + protected $jobFactory; + + /** + * @var ResolvedJob + */ + protected $resolved; + + protected $io; + + /** + * @var array + */ + protected $options = array(); + + /** + * @param string $name + * @param JobFactory $jobFactory + * @param array $options + */ + public function __construct($name, JobFactory $jobFactory, array $options = array()) + { + $this->name = $name; + //$this->io = $io; + $this->jobFactory = $jobFactory; + $this->options = $options; + } + + /** + * Add sub job to a job + * + * @param string $child Name of the child + * @param mixed $type The JobTypeInterface or the alias of the job type registered as a service + * @param IoDescriptor $io To connect jobs together + * @param array $options + * + * @return JobBuilder + */ + public function add($child, $type, array $options = array()) + { + $this->children[$child] = null; // to keep order + $this->unresolvedChildren[$child] = array( + 'type' => $type, + 'options' => $options + ); + + return $this; + } + + /** + * Create new JobBuilder + * + * @param string $name + * @param mixed $type The JobTypeInterface or the alias of the job type registered as a service + * @param IoDescriptor $io To connect jobs together + * @param array $options + * + * @return JobBuilder + */ + public function create($name, $type, IoDescriptor $io = null, array $options = array()) + { + return $this->jobFactory->createNamedBuilder($name, $type, $io, $options); + } + + /** + * Create the job with all children configure + * + * @return Job + */ + public function getJob() + { + $this->resolveChildren(); + + $job = new Job($this->name, $this->resolved, $this->getOptions()); + + foreach ($this->children as $child) { + $job->add($child->getJob()); + } + + return $job; + } + + /** + * @param ResolvedJob $resolved + */ + public function setResolved(ResolvedJob $resolved) + { + $this->resolved = $resolved; + } + + /** + * @return ResolvedJob + */ + public function getResolved() + { + return $this->resolved; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @return array + */ + public function getOptions() + { + return $this->options; + } + + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ + public function getOption($name, $default = null) + { + return array_key_exists($name, $this->options) ? $this->options[$name] : $default; + } + + /** + * For each child added, we create a new JobBuilder around it to make fully configurable each sub job + */ + private function resolveChildren() + { + foreach ($this->unresolvedChildren as $name => $info) { + $this->children[$name] = $this->create($name, $info['type'], $this->getOption('io'), $info['options']); + } + + $this->unresolvedChildren = array(); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobContext.php b/src/Rezzza/JobFlow/JobContext.php new file mode 100644 index 0000000..c14a88c --- /dev/null +++ b/src/Rezzza/JobFlow/JobContext.php @@ -0,0 +1,95 @@ + + */ +class JobContext implements JobContextInterface +{ + /** + * The job id to execute + * + * @var string + */ + public $jobId; + + /** + * Current child job in execution + */ + public $current; + + /** + * Steps already executed + */ + public $steps = array(); + + public $options = array(); + + public function __construct($jobId) + { + $this->jobId = $jobId; + $this->options = array( + 'total' => null, + 'offset' => 0, + 'limit' => 10 + ); + } + + public function updateToNextJob(JobGraph $graph) + { + // We stock we executed this job + $this->addStep($this->current); + + if ($graph->hasNextJob()) { + $nextJob = $graph->getNextJob(); + } else { + $this->options['offset'] += $this->options['limit']; + $nextJob = null; + + if (!$this->isFinished()) { + // Check if we should reloop + $nextJob = $graph->getJob(0); + } + } + + $this->current = $nextJob; + } + + public function addStep($step) + { + $this->steps[] = $step; + } + + public function getPrevious() + { + return end($this->steps); + } + + public function setNext(array $next) + { + $this->next = $next; + } + + public function hasNext() + { + return count($this->next) > 0; + } + + public function isFinished() + { + return $this->options['total'] <= $this->options['offset']; + } + + public function isStarting() + { + return count($this->steps) === 0; + } + + public function getMessageName() + { + return sprintf('%s', $this->jobId); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobContextInterface.php b/src/Rezzza/JobFlow/JobContextInterface.php new file mode 100644 index 0000000..12447f5 --- /dev/null +++ b/src/Rezzza/JobFlow/JobContextInterface.php @@ -0,0 +1,11 @@ + + */ +interface JobContextInterface +{ + +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobFactory.php b/src/Rezzza/JobFlow/JobFactory.php new file mode 100644 index 0000000..16aced0 --- /dev/null +++ b/src/Rezzza/JobFlow/JobFactory.php @@ -0,0 +1,114 @@ +get('job_flow.factory')` + * + * @author Timothée Barray + */ +class JobFactory +{ + /** + * @var JobRegistry + */ + protected $registry; + + protected $ioResolver; + + /** + * @param JobRegistry $registry + * @param IoResolver $ioResolver + */ + public function __construct(JobRegistry $registry, IoResolver $ioResolver) + { + $this->registry = $registry; + $this->ioResolver = $ioResolver; + } + + /** + * Create a job + * + * @param mixed $type The JobTypeInterface or the alias of the job type registered as a service + * @param array $options + * + * @return Job + */ + public function create($type, array $options = array()) + { + return $this->createBuilder($type, $options)->getJob(); + } + + /** + * Create a builder + * + * @param mixed $type The JobTypeInterface or the alias of the job type registered as a service + * @param array $options + * + * @return JobBuilder + */ + public function createBuilder($type = 'job', IoDescriptor $io = null, array $options = array()) + { + $name = $type instanceof JobTypeInterface || $type instanceof ResolvedJob + ? $type->getName() + : $type; + + return $this->createNamedBuilder($name, $type, $io, $options); + } + + /** + * @param string $name + * @param mixed $type The JobTypeInterface or the alias of the job type registered as a service + * @param IoDescriptor $io To connect jobs together + * @param array $options + * + * @return JobBuilder + */ + public function createNamedBuilder($name, $type = 'job', IoDescriptor $io = null, array $options = array()) + { + if (is_string($type)) { + $type = $this->registry->getType($type); + } + + $io = $this->ioResolver->resolve($io); + + if (null !== $io && !array_key_exists('io', $options)) { + $options['io'] = $io; + } + + if ($type instanceof JobTypeInterface) { + $type = $this->createType($type, $io); + } elseif (!$type instanceof ResolvedJob) { + throw new \InvalidArgumentException(sprintf('Type "%s" should be a string, JobTypeInterface or ResolvedJob', $type)); + } + + return $type->createBuilder($name, $this, $options); + } + + /** + * Create wrapper for combination of JobType and JobConnector + * + * @param JobTypeInterface $type + * @param IoDescriptor $io + * + * @return ResolvedJob + */ + public function createType(JobTypeInterface $type, IoDescriptor $io) + { + $parentType = $type->getParent(); + + if ($parentType instanceof JobTypeInterface) { + $parentType = $this->createType($parentType, $io); + } elseif (null !== $parentType) { + $parentType = $this->registry->getType($parentType); + $parentType = $this->createType($parentType, $io); + } + + return new ResolvedJob($type, $io, $parentType); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobFactoryBuilder.php b/src/Rezzza/JobFlow/JobFactoryBuilder.php new file mode 100644 index 0000000..71381f4 --- /dev/null +++ b/src/Rezzza/JobFlow/JobFactoryBuilder.php @@ -0,0 +1,115 @@ +extensions[] = $extension; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function addExtensions(array $extensions) + { + $this->extensions = array_merge($this->extensions, $extensions); + + return $this; + } + + /** + * {@inheritdoc} + */ + public function addType(JobTypeInterface $type) + { + $this->types[$type->getName()] = $type; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function addTypes(array $types) + { + foreach ($types as $type) { + $this->types[$type->getName()] = $type; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function addWrapper(IoWrapperInterface $wrapper) + { + $this->wrappers[$wrapper->getName()] = $wrapper; + + return $this; + } + + /** + * {@inheritdoc} + */ + public function addWrappers(array $wrappers) + { + foreach ($wrappers as $wrapper) { + $this->wrappers[$wrapper->getName()] = $wrappers; + } + + return $this; + } + + /** + * {@inheritdoc} + */ + public function getJobFactory() + { + $extensions = $this->extensions; + + if (count($this->types) > 0 || count($this->wrappers) > 0) { + $base = new BaseExtension(); + + foreach ($this->types as $type) { + $base->addType($type); + } + + foreach ($this->wrappers as $wrapper) { + $base->addWrapper($wrapper); + } + + $extensions[] = $base; + } + + $registry = new JobRegistry($extensions); + $resolver = new IoResolver($registry); + + return new JobFactory($registry, $resolver); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobInterface.php b/src/Rezzza/JobFlow/JobInterface.php new file mode 100644 index 0000000..d755d12 --- /dev/null +++ b/src/Rezzza/JobFlow/JobInterface.php @@ -0,0 +1,11 @@ + + */ +interface JobInterface +{ + public function execute($input, $execution); +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobMessage.php b/src/Rezzza/JobFlow/JobMessage.php new file mode 100644 index 0000000..1f25872 --- /dev/null +++ b/src/Rezzza/JobFlow/JobMessage.php @@ -0,0 +1,35 @@ +context = $context; + } + + public function hasData() + { + return count($this->data) > 0; + } + + public function getData() + { + return $this->data; + } + + public function setData($data) + { + $this->data = $data; + } + + public function __clone() + { + $this->context = clone $this->context; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobRegistry.php b/src/Rezzza/JobFlow/JobRegistry.php new file mode 100644 index 0000000..20801de --- /dev/null +++ b/src/Rezzza/JobFlow/JobRegistry.php @@ -0,0 +1,84 @@ + + */ +class JobRegistry +{ + /** + * @var JobTypeInterface[] + */ + protected $types = array(); + + /** + * @var IoWrapperInterface[] + */ + protected $wrappers = array(); + + public function __construct(array $extensions) + { + $this->extensions = $extensions; + } + + /** + * Try to find a JobTypeInterface registered with $name as alias + * + * @param string $id + * + * @return JobTypeInterface + */ + public function getType($name) + { + if (!isset($this->types[$name])) { + $type = null; + + foreach ($this->extensions as $extension) { + if ($extension->hasType($name)) { + $type = $extension->getType($name); + break; + } + } + + if (!$type) { + throw new \InvalidArgumentException(sprintf('Could not load type "%s"', $name)); + } + + $this->types[$type->getName()] = $type; + } + + return $this->types[$name]; + } + + /** + * Try to find a IoWrapperInterface registered with $name as alias + * + * @param string $id + * + * @return JobTypeInterface + */ + public function getWrapper($name) + { + if (!isset($this->wrappers[$name])) { + $wrapper = null; + + foreach ($this->extensions as $extension) { + if ($extension->hasWrapper($name)) { + $wrapper = $extension->getWrapper($name); + break; + } + } + + if (!$wrapper) { + throw new \InvalidArgumentException(sprintf('Could not load wrapper "%s"', $name)); + } + + $this->wrappers[$wrapper->getName()] = $wrapper; + } + + return $this->wrappers[$name]; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobStep.php b/src/Rezzza/JobFlow/JobStep.php new file mode 100644 index 0000000..d8cfff5 --- /dev/null +++ b/src/Rezzza/JobFlow/JobStep.php @@ -0,0 +1,16 @@ + + */ +class JobStep +{ + public $output; + + public function __construct($stdout) + { + $this->output = $stdout->getDsn(); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/JobTypeInterface.php b/src/Rezzza/JobFlow/JobTypeInterface.php new file mode 100644 index 0000000..0a44557 --- /dev/null +++ b/src/Rezzza/JobFlow/JobTypeInterface.php @@ -0,0 +1,36 @@ + + */ +interface JobTypeInterface +{ + /** + * Builds the job. + * + * This method is called for each type in the hierarchy starting form the + * top most type. + * + * @param JobBuilder $builder The job builder + * @param array $options The options + */ + public function buildJob(JobBuilder $builder, array $options); + + /** + * Sets the default options for this type. + * + * @param OptionsResolverInterface $resolver The resolver for the options. + */ + public function setDefaultOptions(OptionsResolverInterface $resolver); + + /** + * Returns the name of this type. + * + * @return string The name of this type + */ + public function getName(); +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Jobs.php b/src/Rezzza/JobFlow/Jobs.php new file mode 100644 index 0000000..59531aa --- /dev/null +++ b/src/Rezzza/JobFlow/Jobs.php @@ -0,0 +1,38 @@ +getJobFactory(); + } + + /** + * Creates a form factory builder with the default configuration. + * + * @return JobFactoryBuilder The job factory builder. + */ + public static function createJobFactoryBuilder() + { + $builder = new JobFactoryBuilder(); + $builder->addExtension(new CoreExtension()); + + return $builder; + } + + /** + * This class should not be instantiated. + */ + private function __construct() + { + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/ResolvedJob.php b/src/Rezzza/JobFlow/ResolvedJob.php new file mode 100644 index 0000000..f84b452 --- /dev/null +++ b/src/Rezzza/JobFlow/ResolvedJob.php @@ -0,0 +1,145 @@ + + */ +class ResolvedJob +{ + /** + * @var JobTypeInterface + */ + private $innerType; + + /** + * @var IoDescriptor + */ + private $io; + + /** + * @var ResolvedJob + */ + private $parent; + + /** + * @var OptionsResolverInterface + */ + private $optionsResolver; + + public function __construct(JobTypeInterface $innerType, IoDescriptor $io, ResolvedJob $parent = null) + { + $this->innerType = $innerType; + $this->io = $io; + $this->parent = $parent; + } + + /** + * Execute innerType + * + * @param JobContext $context + * + * @return boolean + */ + public function execute($input, $execution) + { + return $this->innerType->execute($input, $execution); + } + + /** + * @return [ResolvedJob|null] + */ + public function getParent() + { + return $this->parent; + } + + /** + * @return IoDescriptor + */ + public function getIo() + { + return $this->io; + } + + /** + * @return JobTypeInterface + */ + public function getInnerType() + { + return $this->innerType; + } + + /** + * Init options with innerType requirements + * + * @return OptionsResolver + */ + public function getOptionsResolver() + { + if (null === $this->optionsResolver) { + if (null !== $this->parent) { + $this->optionsResolver = clone $this->parent->getOptionsResolver(); + } else { + $this->optionsResolver = new OptionsResolver(); + } + + $this->innerType->setDefaultOptions($this->optionsResolver); + } + + return $this->optionsResolver; + } + + /** + * @param string $name + * @param JobFactory $factory + * @param array $options + * + * @return JobBuilder + */ + public function createBuilder($name, JobFactory $factory, array $options = array()) + { + $options = $this->getOptionsResolver()->resolve($options); + + $builder = $this->newBuilder($name, $factory, $options); + + $builder->setResolved($this); + + $this->buildJob($builder, $options); + + return $builder; + } + + /** + * Create new JobBuilder for the innerType + * + * @param string $name + * @param JobFactory $factory + * @param array $options + * + * @return JobBuilder + */ + protected function newBuilder($name, JobFactory $factory, array $options) + { + return new JobBuilder($name, $factory, $options); + } + + /** + * @param JobBuilder $builder + * @param array $options + */ + protected function buildJob(JobBuilder $builder, array $options) + { + if (null !== $this->parent) { + $this->parent->buildJob($builder, $options); + } + + $this->innerType->buildJob($builder, $options); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Resources/readme.md b/src/Rezzza/JobFlow/Resources/readme.md new file mode 100644 index 0000000..f26d44c --- /dev/null +++ b/src/Rezzza/JobFlow/Resources/readme.md @@ -0,0 +1,7 @@ +# Process + +If job command launched, we queue an initial message which contains only job id to execute. + +In this case we determine first job as the first child. + +After execution we write in context the next one. \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/ExecutionContext.php b/src/Rezzza/JobFlow/Scheduler/ExecutionContext.php new file mode 100644 index 0000000..c6e6bc1 --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/ExecutionContext.php @@ -0,0 +1,195 @@ + + */ +class ExecutionContext +{ + use LoggerAwareTrait; + + /** + * Current msg + * + * @var JobMessage + */ + public $msg; + + /** + * Representation of the navigation through the jobs + * + * @var RecursiveArrayIterator + */ + public $graph; + + /** + * Current job in execution + * + * @var JobInterface + */ + public $job; + + /** + * Current child job in execution + */ + public $child; + + /** + * @var array + */ + protected $options; + + /** + * @param JobInterface $job + * @param JobMessage $msg + * @param JobGraph $graph + * @param array $options + */ + public function __construct(JobInterface $job, JobMessage $msg, JobGraph $graph) + { + $this->job = $job; + $this->graph = $graph; + $this->msg = $msg; + $this->initCurrentJob(); + + $resolver = new OptionsResolver(); + $this->setDefaultOptions($resolver); + $this->options = $resolver->resolve($this->msg->context->options); + } + + /** + * Run execute on a job for the current msg. + * It will determine himself which child need to be execute + * + * @param JobInterface $job + */ + public function executeJob(JobInterface $job) + { + $this->addOptions($job->getOptions()); + + if (null === $this->getCurrentJob()) { + return; + } + + $this->child = $job->get($this->getCurrentJob()); + + $this->addOptions($this->child->getOptions()); + + // Run execution + $result = $this->child->execute($this->readData(), $this); + + return $this->writeData($result); + } + + /** + * Looks for data in Message. If no result, try to read input + * + * @return mixed + */ + public function readData() + { + if ($this->msg->hasData()) { + return $this->msg->getData(); + } + + return $this->child->getResolved()->getIo()->read(); + } + + /** + * Write data on current message and update context to follow the next job + * + * @param mixed $data + */ + public function writeData($data) + { + $msg = clone $this->msg; + + $msg->setData($data); + $msg->context->updateToNextJob($this->graph); + + return $msg; + } + + /** + * Get name of the child job in execution + * + * @return string + */ + public function getCurrentJob() + { + return $this->msg->context->current; + } + + /** + * @return string + */ + public function getJobId() + { + return $this->msg->context->jobId; + } + + /** + * At the begining get the first item of the graph + */ + public function initCurrentJob() + { + if ($this->msg->context->isStarting()) { + $this->msg->context->current = $this->graph->current(); + + return; + } + + $index = array_search($this->msg->context->current, $this->graph->getArrayCopy()); + $this->graph->seek($index); + } + + public function getOption($name) + { + return $this->options[$name]; + } + + public function setDefaultOptions(OptionsResolverInterface $resolver) + { + $resolver->setDefaults(array( + 'offset' => null, + 'limit' => null, + 'mapping' => null, + 'total' => null + )); + } + + public function addOptions(array $options) + { + $this->options = array_merge($this->options, $options); + } + + public function getOptions() + { + return $this->options; + } + + public function getLogger() + { + return $this->logger; + } + + public function getOutput() + { + return $this->child->getResolved()->getIo()->write(); + } + + public function setGlobalOption($key, $value) + { + $this->msg->context->options[$key] = $value; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/JobGraph.php b/src/Rezzza/JobFlow/Scheduler/JobGraph.php new file mode 100644 index 0000000..30006ab --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/JobGraph.php @@ -0,0 +1,71 @@ + + */ +class JobGraph implements \IteratorAggregate +{ + public $graph; + + public function __construct(ArrayIterator $graph) + { + $this->graph = $graph; + } + + public function current() + { + return $this->graph->current(); + } + + public function getArrayCopy() + { + return $this->graph->getArrayCopy(); + } + + public function seek($index) + { + return $this->graph->seek($index); + } + + /** + * @return boolean + */ + public function hasNextJob() + { + return $this->graph->offsetExists($this->graph->key() + 1); + } + + /** + * Get name of the next child job + * + * @return string + */ + public function getNextJob() + { + return $this->getJob($this->graph->key() + 1); + } + + /** + * Get name of the job for given index + * + * @param string|integer $index + * + * @return string + */ + public function getJob($index) + { + return $this->graph->offsetGet($index); + } + + /** + * @return Iterator + */ + public function getIterator() + { + return $this->graph; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/JobScheduler.php b/src/Rezzza/JobFlow/Scheduler/JobScheduler.php new file mode 100644 index 0000000..deecb93 --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/JobScheduler.php @@ -0,0 +1,205 @@ + + */ +class JobScheduler +{ + use LoggerAwareTrait; + + /** + * @var JobInterface + */ + protected $job; + + /** + * @var StrategyInterface + */ + protected $strategy; + + /** + * @var TransportInterface + */ + protected $transport; + + /** + * @param TransportInterface $transport + * @param StrategyInterface $strategy + */ + public function __construct(TransportInterface $transport, StrategyInterface $strategy = null) + { + $this->transport = $transport; + $this->strategy = $strategy; + } + + /** + * @return TransportInterface + */ + public function getStrategy() + { + return $this->strategy; + } + + /** + * @return StrategyInterface + */ + public function getTransport() + { + return $this->transport; + } + + /** + * @return JobInterface + */ + public function getJob() + { + return $this->job; + } + + public function getIo() + { + return $this->io; + } + + public function setIo(IoDescriptor $io) + { + $this->io = $io; + } + + /** + * @param JobInterface $job + * + * @return JobScheduler + */ + public function setJob(JobInterface $job) + { + $this->job = $job; + $this->buildGraph(); + $this->job->setLocked(true); + + return $this; + } + + public function init() + { + if (null === $this->getJob()) { + throw new \RuntimeException('You need to set a job'); + } + + $name = sprintf('%s.%s', $this->getJob()->getName(), $this->jobGraph->current()); + + $this->transport->addMessage($this->getInitMessage(), $name); + + return $this; + } + + public function run($msg = null) + { + if (null !== $msg) { + return $this->runJob($msg); + } + + while ($msg = $this->transport->getMessage()) { + $result = $this->runJob($msg); + } + + return $result; + } + + public function wait() + { + return $this->transport->getMessage(); + } + + public function addMessage(JobMessage $msg) + { + $this->transport->addMessage($msg, $msg->context->getMessageName()); + + return $this; + } + + protected function runJob($msg) + { + if (!$msg instanceof JobMessage) { + return; + } + + $context = new ExecutionContext( + $this->job, + $msg, + $this->jobGraph + ); + + if (null !== $this->logger) { + $context->setLogger($this->logger); + } + + return $this->transport->store($context->executeJob($this->job)); + } + + private function getInitMessage() + { + return new JobMessage(new JobContext($this->getJob()->getName())); + } + + /** + * Build a graph on children execution order + */ + private function buildGraph() + { + $children = $this->getJob()->getChildren(); + + /*uasort($children, function($a, $b) { + $stdinA = $a->getResolved()->getIo()->stdin; + $stdinB = $b->getResolved()->getIo()->stdin; + + if ($stdinA && $stdinA->isFirstStep()) { + return -1; + } + + if ($stdinB && $stdinB->isFirstStep()) { + return 1; + } + + if ($stdinB && $stdinB->isConnectedTo($a->getName())) { + return -1; + } + + if ($stdinA && $stdinB && $stdinA->getDsn() === $stdinB->getDsn()) { + return 0; + } + + if ($stdinA && $stdinA->isConnectedTo($b->getName())) { + return 1; + } + }); + + $i = 0; + foreach ($children as $child) { + $stdin = $child->getResolved()->getIo()->stdin; + + if ($i++ > 0) { + if (false === $stdin->isConnectedTo($last->getName())) { + throw new \RuntimeException('Job Graph is not consistent'); + } + } + + $last = $child; + }*/ + + $this->jobGraph = new JobGraph(new \ArrayIterator(array_keys($children))); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/Strategy/DirectStrategy.php b/src/Rezzza/JobFlow/Scheduler/Strategy/DirectStrategy.php new file mode 100644 index 0000000..cb4a4c5 --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/Strategy/DirectStrategy.php @@ -0,0 +1,8 @@ +messages[] = $msg; + + return $this; + } + + public function getMessage() + { + return array_shift($this->messages); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/Transport/PhpTransport.php b/src/Rezzza/JobFlow/Scheduler/Transport/PhpTransport.php new file mode 100644 index 0000000..9f2195c --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/Transport/PhpTransport.php @@ -0,0 +1,15 @@ +addMessage($result); + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/Transport/RabbitMqTransport.php b/src/Rezzza/JobFlow/Scheduler/Transport/RabbitMqTransport.php new file mode 100644 index 0000000..a0e45e6 --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/Transport/RabbitMqTransport.php @@ -0,0 +1,30 @@ +rpcClient = $rpcClient; + } + + public function addMessage($msg, $name = null) + { + $this->rpcClient->addRequest(serialize($msg), 'job', $name); + } + + public function getMessage() + { + $this->rpcClient->getReplies(); + } + + public function store($result) + { + return $result; + } +} \ No newline at end of file diff --git a/src/Rezzza/JobFlow/Scheduler/Transport/TransportInterface.php b/src/Rezzza/JobFlow/Scheduler/Transport/TransportInterface.php new file mode 100644 index 0000000..2b53dd1 --- /dev/null +++ b/src/Rezzza/JobFlow/Scheduler/Transport/TransportInterface.php @@ -0,0 +1,10 @@ +mockGenerator->orphanize('__construct'); + + $this + ->if($mockResolved = new \mock\Rezzza\JobFlow\ResolvedJob) + ->and($job = new TestedClass('test', $mockResolved)) + ->and($child = new TestedClass('child', $mockResolved)) + ->then($job->add($child)) + ->object($job->get('child'))->isIdenticalTo($child) + ; + } + + public function test_should_throw_exception_on_incorrect_name() + { + $this->mockGenerator->orphanize('__construct'); + + $this + ->if($mockResolved = new \mock\Rezzza\JobFlow\ResolvedJob) + ->then($job = new TestedClass('test', $mockResolved)) + ->exception(function() use ($job) { + $job->get('test'); + }) + ->hasMessage('No child with name : "test" in job "test"') + ; + } +}