Skip to content
Browse files

initial commit

  • Loading branch information...
0 parents commit d3c1421b3abfe7a3d63abe721e6fe497b11b8ac4 @kriswallsmith committed Apr 9, 2012
4 .gitignore
@@ -0,0 +1,4 @@
+composer.lock
+composer.phar
+phpunit.xml
+vendor/
20 composer.json
@@ -0,0 +1,20 @@
+{
+ "name": "kriswallsmith/spork",
+ "description": "Asynchronous PHP",
+ "homepage": "https://github.com/kriswallsmith/spork",
+ "type": "library",
+ "license": "MIT",
+ "authors": [
+ {
+ "name": "Kris Wallsmith",
+ "email": "kris.wallsmith@gmail.com",
+ "homepage": "http://kriswallsmith.net/"
+ }
+ ],
+ "require": {
+ "php": ">=5.3.0"
+ },
+ "autoload": {
+ "psr-0": { "Spork": "src/" }
+ }
+}
15 phpunit.xml.dist
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<phpunit bootstrap="./tests/bootstrap.php" colors="true">
+ <testsuites>
+ <testsuite name="Spork Test Suite">
+ <directory suffix="Test.php">./tests/Spork/Test/</directory>
+ </testsuite>
+ </testsuites>
+
+ <filter>
+ <whitelist>
+ <directory suffix=".php">./src/Spork/</directory>
+ </whitelist>
+ </filter>
+</phpunit>
140 src/Spork/Deferred/Deferred.php
@@ -0,0 +1,140 @@
+<?php
+
+namespace Spork\Deferred;
+
+use Spork\Exception\UnexpectedTypeException;
+
+class Deferred implements DeferredInterface
+{
+ private $state;
+ private $alwaysCallbacks;
+ private $doneCallbacks;
+ private $failCallbacks;
+ private $callbackArgs;
+
+ public function __construct()
+ {
+ $this->state = DeferredInterface::STATE_PENDING;
+
+ $this->alwaysCallbacks = array();
+ $this->doneCallbacks = array();
+ $this->failCallbacks = array();
+ }
+
+ public function getState()
+ {
+ return $this->state;
+ }
+
+ public function always($alwaysCallback)
+ {
+ if (!is_callable($alwaysCallback)) {
+ throw new UnexpectedTypeException($alwaysCallback, 'callable');
+ }
+
+ switch ($this->state) {
@bobthecow
bobthecow added a note Apr 10, 2012

Feels like there's room for a race here (and in the other switches on state). Do you need a mutex?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ case DeferredInterface::STATE_PENDING:
+ $this->alwaysCallbacks[] = $alwaysCallback;
+ break;
+ case DeferredInterface::STATE_RESOLVED:
+ call_user_func_array($alwaysCallback, $this->callbackArgs);
+ break;
+ }
+
+ return $this;
+ }
+
+ public function done($doneCallback)
+ {
+ if (!is_callable($doneCallback)) {
+ throw new UnexpectedTypeException($doneCallback, 'callable');
+ }
+
+ switch ($this->state) {
+ case DeferredInterface::STATE_PENDING:
+ $this->doneCallbacks[] = $doneCallback;
+ break;
+ default:
+ call_user_func_array($doneCallback, $this->callbackArgs);
+ }
+
+ return $this;
+ }
+
+ public function fail($failCallback)
+ {
+ if (!is_callable($failCallback)) {
+ throw new UnexpectedTypeException($failCallback, 'callable');
+ }
+
+ switch ($this->state) {
+ case DeferredInterface::STATE_PENDING:
+ $this->failCallbacks[] = $failCallback;
+ break;
+ case DeferredInterface::STATE_REJECTED:
+ call_user_func_array($failCallback, $this->callbackArgs);
+ break;
+ }
+
+ return $this;
+ }
+
+ public function then($doneCallback, $failCallback = null)
+ {
+ $this->done($doneCallback);
+
+ if ($failCallback) {
+ $this->fail($failCallback);
+ }
+
+ return $this;
+ }
+
+ public function resolve()
+ {
+ if (DeferredInterface::STATE_REJECTED === $this->state) {
+ throw new \LogicException('Cannot resolve a deferred object that has already been rejected');
+ }
+
+ if (DeferredInterface::STATE_RESOLVED === $this->state) {
+ return;
+ }
+
+ $this->state = DeferredInterface::STATE_RESOLVED;
+ $this->callbackArgs = func_get_args();
+
+ while ($func = array_shift($this->alwaysCallbacks)) {
+ call_user_func_array($func, $this->callbackArgs);
+ }
+
+ while ($func = array_shift($this->doneCallbacks)) {
+ call_user_func_array($func, $this->callbackArgs);
+ }
+
+ return $this;
+ }
+
+ public function reject()
+ {
+ if (DeferredInterface::STATE_RESOLVED === $this->state) {
+ throw new \LogicException('Cannot reject a deferred object that has already been resolved');
+ }
+
+ if (DeferredInterface::STATE_REJECTED === $this->state) {
+ return;
+ }
+
+ $this->state = DeferredInterface::STATE_REJECTED;
+ $this->callbackArgs = func_get_args();
+
+ while ($func = array_shift($this->alwaysCallbacks)) {
+ call_user_func_array($func, $this->callbackArgs);
+ }
+
+ while ($func = array_shift($this->failCallbacks)) {
+ call_user_func_array($func, $this->callbackArgs);
+ }
+
+ return $this;
+ }
+}
79 src/Spork/Deferred/DeferredAggregate.php
@@ -0,0 +1,79 @@
+<?php
+
+namespace Spork\Deferred;
+
+use Spork\Exception\UnexpectedTypeException;
+
+class DeferredAggregate implements PromiseInterface
+{
+ private $children;
+ private $delegate;
+
+ public function __construct(array $children)
+ {
+ // connect to each deferred
+ foreach ($children as $child) {
+ if (!$child instanceof PromiseInterface) {
+ throw new UnexpectedTypeException($child, 'Spork\PromiseInterface');
@stof
stof added a note Apr 10, 2012

the namespace of the interface is wrong here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ }
+
+ $child->always(array($this, 'tick'));
+ }
+
+ $this->children = $children;
+ $this->delegate = new Deferred();
+ }
+
+ public function getState()
+ {
+ return $this->delegate->getState();
+ }
+
+ public function always($alwaysCallback)
+ {
+ $this->delegate->always($alwaysCallback);
+
+ return $this;
+ }
+
+ public function done($doneCallback)
+ {
+ $this->delegate->done($doneCallback);
+
+ return $this;
+ }
+
+ public function fail($failCallback)
+ {
+ $this->delegate->fail($failCallback);
+
+ return $this;
+ }
+
+ public function then($doneCallback, $failCallback = null)
+ {
+ $this->delegate->then($doneCallback, $failCallback);
+
+ return $this;
+ }
+
+ public function tick()
+ {
+ $pending = count($this->children);
+
+ foreach ($this->children as $child) {
+ switch ($child->getState()) {
+ case PromiseInterface::STATE_REJECTED:
+ $this->delegate->reject();
+ return;
+ case PromiseInterface::STATE_RESOLVED:
+ --$pending;
+ break;
+ }
+ }
+
+ if (!$pending) {
+ $this->delegate->resolve();
+ }
+ }
+}
11 src/Spork/Deferred/DeferredFactory.php
@@ -0,0 +1,11 @@
+<?php
+
+namespace Spork\Deferred;
+
+class DeferredFactory implements FactoryInterface
+{
+ public function createDeferred()
+ {
+ return new Deferred();
+ }
+}
9 src/Spork/Deferred/DeferredInterface.php
@@ -0,0 +1,9 @@
+<?php
+
+namespace Spork\Deferred;
+
+interface DeferredInterface extends PromiseInterface
+{
+ function resolve();
+ function reject();
+}
8 src/Spork/Deferred/FactoryInterface.php
@@ -0,0 +1,8 @@
+<?php
+
+namespace Spork\Deferred;
+
+interface FactoryInterface
+{
+ function createDeferred();
+}
16 src/Spork/Deferred/PromiseInterface.php
@@ -0,0 +1,16 @@
+<?php
+
+namespace Spork\Deferred;
+
+interface PromiseInterface
+{
+ const STATE_PENDING = 'pending';
+ const STATE_RESOLVED = 'resolved';
+ const STATE_REJECTED = 'rejected';
+
+ function getState();
+ function always($alwaysCallback);
+ function done($doneCallback);
+ function fail($failCallback);
+ function then($doneCallback, $failCallback = null);
+}
7 src/Spork/Exception/ProcessControlException.php
@@ -0,0 +1,7 @@
+<?php
+
+namespace Spork\Exception;
+
+class ProcessControlException extends \RuntimeException
+{
+}
11 src/Spork/Exception/UnexpectedTypeException.php
@@ -0,0 +1,11 @@
+<?php
+
+namespace Spork\Exception;
+
+class UnexpectedTypeException extends \LogicException
+{
+ public function __construct($value, $expectedType)
+ {
+ parent::__construct(sprintf('Expected argument of type "%s", "%s" given', $expectedType, is_object($value) ? get_class($value) : gettype($value)));
+ }
+}
113 src/Spork/ProcessManager.php
@@ -0,0 +1,113 @@
+<?php
+
+declare(ticks=1);
+
+namespace Spork;
+
+use Spork\Deferred\DeferredInterface;
+use Spork\Deferred\FactoryInterface;
+use Spork\Exception\ProcessControlException;
+use Spork\Exception\UnexpectedTypeException;
+
+class ProcessManager
+{
+ private $factory;
+ private $defers;
+
+ public function __construct(FactoryInterface $factory)
+ {
+ $this->factory = $factory;
+ $this->defers = array();
+
+ pcntl_signal(SIGCHLD, array($this, 'waitNoHang'));
+ }
+
+ public function __clone()
+ {
+ $this->defers = array();
+ }
+
+ public function __destruct()
+ {
+ $this->wait();
+ }
+
+ /**
+ * Forks something into another process and returns a deferred object.
+ */
+ public function fork($callable, array $arguments = array())
+ {
+ if (!is_callable($callable)) {
+ throw new UnexpectedTypeException($callable, 'callable');
+ }
+
+ $pid = pcntl_fork();
+
+ if (-1 === $pid) {
+ throw new ProcessControlException('Unable to fork a new process');
+ }
+
+ if (0 === $pid) {
+ // reset the stack of defers
+ $this->defers = array();
+
+ ob_start();
+
+ try {
+ call_user_func_array($callable, $arguments);
+ $statusCode = 0;
+ } catch (\Exception $e) {
+ $statusCode = 1;
+ }
+
+ // dump the output to a file
+ file_put_contents($this->getOutputFile(posix_getpid()), ob_get_clean());
+
+ exit($statusCode);
+ }
+
+ return $this->defers[$pid] = $this->factory->createDeferred();
+ }
+
+ /**
+ * Waits for all child processes to exit.
+ */
+ public function wait($hang = true)
+ {
+ foreach ($this->defers as $pid => $defer) {
+ if (DeferredInterface::STATE_PENDING !== $defer->getState()) {
+ continue;
+ }
+
+ $wait = pcntl_waitpid($pid, $status, $hang ? 0 : WNOHANG);
+
+ if ($wait < 1) {
+ continue;
+ }
+
+ if (file_exists($file = $this->getOutputFile($pid))) {
+ $output = file_get_contents($file);
+ unlink($file);
+ } else {
+ $output = null;
+ }
+
+ $statusCode = pcntl_wexitstatus($status);
+ if (0 === $statusCode) {
+ $defer->resolve($output, $statusCode, $status);
+ } else {
+ $defer->reject($output, $statusCode, $status);
+ }
+ }
+ }
+
+ public function waitNoHang()
+ {
+ $this->wait(false);
+ }
+
+ private function getOutputFile($pid)
+ {
+ return realpath(sys_get_temp_dir()).'/spork_'.$pid.'.out';
+ }
+}
54 tests/Spork/Test/ProcessManagerTest.php
@@ -0,0 +1,54 @@
+<?php
+
+namespace Spork\Test;
+
+use Spork\Deferred\DeferredFactory;
+use Spork\ProcessManager;
+
+class ProcessManagerTest extends \PHPUnit_Framework_TestCase
+{
+ private $manager;
+
+ protected function setUp()
+ {
+ $this->manager = new ProcessManager(new DeferredFactory());
+ }
+
+ public function testDoneCallbacks()
+ {
+ $log = array();
+
+ $this->manager->fork(function() use(& $log) {
+ echo 'child';
+ })->always(function($output, $status) use(& $log) {
+ $log[] = $output;
+ })->done(function($output, $status) use(& $log) {
+ $log[] = 'done';
+ })->fail(function($output, $status) use(& $log) {
+ $log[] = 'fail';
+ });
+
+ $this->manager->wait();
+
+ $this->assertEquals(array('child', 'done'), $log);
+ }
+
+ public function testFailCallbacks()
+ {
+ $log = array();
+
+ $this->manager->fork(function() use(& $log) {
+ throw new \Exception('child fail');
+ })->always(function($output, $status) use(& $log) {
+ $log[] = $output;
+ })->done(function($output, $status) use(& $log) {
+ $log[] = 'done';
+ })->fail(function($output, $status) use(& $log) {
+ $log[] = 'fail';
+ });
+
+ $this->manager->wait();
+
+ $this->assertEquals(array('', 'fail'), $log);
+ }
+}
9 tests/bootstrap.php
@@ -0,0 +1,9 @@
+<?php
+
+if (!$loader = @include __DIR__.'/../vendor/.composer/autoload.php') {
+ die('You must set up the project dependencies, run the following commands:'.PHP_EOL.
+ 'curl -s http://getcomposer.org/installer | php'.PHP_EOL.
+ 'php composer.phar install'.PHP_EOL);
+}
+
+$loader->add('Spork\Test', __DIR__);

0 comments on commit d3c1421

Please sign in to comment.
Something went wrong with that request. Please try again.