Skip to content

Commit

Permalink
Merge pull request #23 from php-http/feature/rewrite-promise
Browse files Browse the repository at this point in the history
Rewrite promise
  • Loading branch information
joelwurtz committed Aug 31, 2017
2 parents b83b4d1 + dca54bf commit 175f973
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 57 deletions.
18 changes: 7 additions & 11 deletions src/Client.php
Expand Up @@ -14,7 +14,6 @@
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamInterface;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use React\HttpClient\Client as ReactClient;
use React\HttpClient\Request as ReactRequest;
use React\HttpClient\Response as ReactResponse;
Expand Down Expand Up @@ -93,45 +92,42 @@ public function sendRequest(RequestInterface $request)
public function sendAsyncRequest(RequestInterface $request)
{
$reactRequest = $this->buildReactRequest($request);
$deferred = new Deferred();
$promise = new Promise($this->loop);

$reactRequest->on('error', function (\Exception $error) use ($deferred, $request) {
$deferred->reject(new RequestException(
$reactRequest->on('error', function (\Exception $error) use ($promise, $request) {
$promise->reject(new RequestException(
$error->getMessage(),
$request,
$error
));
});

$reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($deferred, $reactRequest, $request) {
$reactRequest->on('response', function (ReactResponse $reactResponse = null) use ($promise, $request) {
$bodyStream = $this->streamFactory->createStream();
$reactResponse->on('data', function ($data) use (&$bodyStream) {
$bodyStream->write((string) $data);
});

$reactResponse->on('end', function (\Exception $error = null) use ($deferred, $request, $reactResponse, &$bodyStream) {
$reactResponse->on('end', function (\Exception $error = null) use ($promise, $request, $reactResponse, &$bodyStream) {
$response = $this->buildResponse(
$reactResponse,
$bodyStream
);
if (null !== $error) {
$deferred->reject(new HttpException(
$promise->reject(new HttpException(
$error->getMessage(),
$request,
$response,
$error
));
} else {
$deferred->resolve($response);
$promise->resolve($response);
}
});
});

$reactRequest->end((string) $request->getBody());

$promise = new Promise($deferred->promise());
$promise->setLoop($this->loop);

return $promise;
}

Expand Down
142 changes: 96 additions & 46 deletions src/Promise.php
Expand Up @@ -3,7 +3,6 @@
namespace Http\Adapter\React;

use React\EventLoop\LoopInterface;
use React\Promise\PromiseInterface as ReactPromise;
use Http\Client\Exception;
use Http\Promise\Promise as HttpPromise;
use Psr\Http\Message\ResponseInterface;
Expand All @@ -12,8 +11,10 @@
* React promise adapter implementation.
*
* @author Stéphane Hulard <stephane@hlrd.me>
*
* @internal
*/
class Promise implements HttpPromise
final class Promise implements HttpPromise
{
/**
* Promise status.
Expand All @@ -22,13 +23,6 @@ class Promise implements HttpPromise
*/
private $state = HttpPromise::PENDING;

/**
* Adapted React promise.
*
* @var ReactPromise
*/
private $promise;

/**
* PSR7 received response.
*
Expand All @@ -43,31 +37,26 @@ class Promise implements HttpPromise
*/
private $exception;

/**
* @var callable|null
*/
private $onFulfilled;

/**
* @var callable|null
*/
private $onRejected;

/**
* React Event Loop used for synchronous processing.
*
* @var LoopInterface
*/
private $loop;

/**
* Initialize the promise.
*
* @param ReactPromise $promise
*/
public function __construct(ReactPromise $promise)
public function __construct(LoopInterface $loop)
{
$promise->then(
function (ResponseInterface $response) {
$this->state = HttpPromise::FULFILLED;
$this->response = $response;
},
function (Exception $error) {
$this->state = HttpPromise::REJECTED;
$this->exception = $error;
}
);
$this->promise = $promise;
$this->loop = $loop;
}

/**
Expand All @@ -80,49 +69,110 @@ function (Exception $error) {
*/
public function then(callable $onFulfilled = null, callable $onRejected = null)
{
$this->promise->then(function () use ($onFulfilled) {
if (null !== $onFulfilled) {
call_user_func($onFulfilled, $this->response);
$newPromise = new self($this->loop);

$onFulfilled = $onFulfilled !== null ? $onFulfilled : function (ResponseInterface $response) {
return $response;
};

$onRejected = $onRejected !== null ? $onRejected : function (Exception $exception) {
throw $exception;
};

$this->onFulfilled = function (ResponseInterface $response) use ($onFulfilled, $newPromise) {
try {
$newPromise->resolve($onFulfilled($response));
} catch (Exception $exception) {
$newPromise->reject($exception);
}
}, function () use ($onRejected) {
if (null !== $onRejected) {
call_user_func($onRejected, $this->exception);
};

$this->onRejected = function (Exception $exception) use ($onRejected, $newPromise) {
try {
$newPromise->resolve($onRejected($exception));
} catch (Exception $exception) {
$newPromise->reject($exception);
}
});
};

return $this;
if ($this->state === HttpPromise::FULFILLED) {
$this->doResolve($this->response);
}

if ($this->state === HttpPromise::REJECTED) {
$this->doReject($this->exception);
}

return $newPromise;
}

/**
* {@inheritdoc}
* Resolve this promise.
*
* @param ResponseInterface $response
*
* @internal
*/
public function getState()
public function resolve(ResponseInterface $response)
{
return $this->state;
if ($this->state !== HttpPromise::PENDING) {
throw new \RuntimeException('Promise is already resolved');
}

$this->state = HttpPromise::FULFILLED;
$this->response = $response;
$this->doResolve($response);
}

private function doResolve(ResponseInterface $response)
{
$onFulfilled = $this->onFulfilled;

if (null !== $onFulfilled) {
$onFulfilled($response);
}
}

/**
* Set EventLoop used for synchronous processing.
* Reject this promise.
*
* @param LoopInterface $loop
* @param Exception $exception
*
* @return Promise
* @internal
*/
public function setLoop(LoopInterface $loop)
public function reject(Exception $exception)
{
$this->loop = $loop;
if ($this->state !== HttpPromise::PENDING) {
throw new \RuntimeException('Promise is already resolved');
}

$this->state = HttpPromise::REJECTED;
$this->exception = $exception;
$this->doReject($exception);
}

private function doReject(Exception $exception)
{
$onRejected = $this->onRejected;

return $this;
if (null !== $onRejected) {
$onRejected($exception);
}
}

/**
* {@inheritdoc}
*/
public function getState()
{
return $this->state;
}

/**
* {@inheritdoc}
*/
public function wait($unwrap = true)
{
if (null === $this->loop) {
throw new \LogicException('You must set the loop before wait!');
}
while (HttpPromise::PENDING === $this->getState()) {
$this->loop->tick();
}
Expand Down
34 changes: 34 additions & 0 deletions tests/PromiseTest.php
@@ -0,0 +1,34 @@
<?php

namespace Http\Adapter\React\Tests;

use GuzzleHttp\Psr7\Response;
use Http\Adapter\React\Promise;
use Http\Adapter\React\ReactFactory;
use PHPUnit\Framework\TestCase;

class PromiseTest extends TestCase
{
private $loop;

public function setUp()
{
$this->loop = ReactFactory::buildEventLoop();
}

public function testChain()
{
$promise = new Promise($this->loop);
$response = new Response(200);

$lastPromise = $promise->then(function (Response $response) {
return $response->withStatus(300);
});

$promise->resolve($response);
$updatedResponse = $lastPromise->wait();

self::assertEquals(200, $response->getStatusCode());
self::assertEquals(300, $updatedResponse->getStatusCode());
}
}

0 comments on commit 175f973

Please sign in to comment.