Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HttpClient] Async HTTPlug client #33743

Merged
merged 1 commit into from Oct 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Expand Up @@ -108,6 +108,7 @@
"doctrine/orm": "~2.4,>=2.4.5",
"doctrine/reflection": "~1.0",
"doctrine/doctrine-bundle": "~1.4",
"guzzlehttp/promises": "^1.3.1",
"masterminds/html5": "^2.6",
"monolog/monolog": "^1.25.1",
"nyholm/psr7": "^1.0",
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/HttpClient/CHANGELOG.md
Expand Up @@ -5,7 +5,7 @@ CHANGELOG
-----

* added `StreamWrapper`
* added `HttplugClient`
* added `HttplugClient` with support for sync and async requests
* added `max_duration` option
* added support for NTLM authentication
* added `$response->toStream()` to cast responses to regular PHP streams
Expand Down
254 changes: 221 additions & 33 deletions src/Symfony/Component/HttpClient/HttplugClient.php
Expand Up @@ -11,74 +11,205 @@

namespace Symfony\Component\HttpClient;

use GuzzleHttp\Promise\Promise as GuzzlePromise;
use Http\Client\Exception\NetworkException;
use Http\Client\Exception\RequestException;
use Http\Client\HttpClient;
use Http\Client\HttpAsyncClient;
use Http\Client\HttpClient as HttplugInterface;
use Http\Message\RequestFactory;
use Http\Message\StreamFactory;
use Http\Message\UriFactory;
use Psr\Http\Client\ClientInterface;
use Psr\Http\Client\NetworkExceptionInterface;
use Psr\Http\Client\RequestExceptionInterface;
use Http\Promise\Promise;
use Http\Promise\RejectedPromise;
use Nyholm\Psr7\Factory\Psr17Factory;
use Nyholm\Psr7\Request;
use Nyholm\Psr7\Uri;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ResponseInterface as Psr7ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Psr\Http\Message\StreamInterface;
use Psr\Http\Message\UriFactoryInterface;
use Psr\Http\Message\UriInterface;
use Symfony\Component\HttpClient\Response\HttplugPromise;
use Symfony\Component\HttpClient\Response\ResponseTrait;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

if (!interface_exists(HttpClient::class)) {
if (!interface_exists(HttplugInterface::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/httplug" package is not installed. Try running "composer require php-http/httplug".');
}

if (!interface_exists(ClientInterface::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "psr/http-client" package is not installed. Try running "composer require psr/http-client".');
}

if (!interface_exists(RequestFactory::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\HttplugClient" as the "php-http/message-factory" package is not installed. Try running "composer require nyholm/psr7".');
}

/**
* An adapter to turn a Symfony HttpClientInterface into an Httplug client.
*
* Run "composer require psr/http-client" to install the base ClientInterface. Run
* "composer require nyholm/psr7" to install an efficient implementation of response
* Run "composer require nyholm/psr7" to install an efficient implementation of response
* and stream factories with flex-provided autowiring aliases.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class HttplugClient implements HttpClient, RequestFactory, StreamFactory, UriFactory
final class HttplugClient implements HttplugInterface, HttpAsyncClient, RequestFactory, StreamFactory, UriFactory
{
private $client;
private $responseFactory;
private $streamFactory;
private $promisePool = [];
private $pendingResponse;

public function __construct(HttpClientInterface $client = null, ResponseFactoryInterface $responseFactory = null, StreamFactoryInterface $streamFactory = null)
{
$this->client = new Psr18Client($client, $responseFactory, $streamFactory);
$this->client = $client ?? HttpClient::create();
$this->responseFactory = $responseFactory;
$this->streamFactory = $streamFactory ?? ($responseFactory instanceof StreamFactoryInterface ? $responseFactory : null);
$this->promisePool = new \SplObjectStorage();

if (null !== $this->responseFactory && null !== $this->streamFactory) {
return;
}

if (!class_exists(Psr17Factory::class)) {
throw new \LogicException('You cannot use the "Symfony\Component\HttpClient\HttplugClient" as no PSR-17 factories have been provided. Try running "composer require nyholm/psr7".');
}

$psr17Factory = new Psr17Factory();
$this->responseFactory = $this->responseFactory ?? $psr17Factory;
$this->streamFactory = $this->streamFactory ?? $psr17Factory;
}

/**
* {@inheritdoc}
*/
public function sendRequest(RequestInterface $request): ResponseInterface
public function sendRequest(RequestInterface $request): Psr7ResponseInterface
{
try {
return $this->client->sendRequest($request);
} catch (RequestExceptionInterface $e) {
throw new RequestException($e->getMessage(), $request, $e);
} catch (NetworkExceptionInterface $e) {
return $this->createPsr7Response($this->sendPsr7Request($request));
} catch (TransportExceptionInterface $e) {
throw new NetworkException($e->getMessage(), $request, $e);
}
}

/**
* {@inheritdoc}
*
* @return HttplugPromise
*/
public function sendAsyncRequest(RequestInterface $request): Promise
{
if (!class_exists(GuzzlePromise::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "guzzlehttp/promises" package is not installed. Try running "composer require guzzlehttp/promises".', __METHOD__));
}

try {
$response = $this->sendPsr7Request($request, true);
} catch (NetworkException $e) {
return new RejectedPromise($e);
}

$cancel = function () use ($response) {
$response->cancel();
unset($this->promisePool[$response]);
};

$promise = new GuzzlePromise(function () use ($response) {
$this->pendingResponse = $response;
$this->wait();
}, $cancel);

$this->promisePool[$response] = [$request, $promise];

return new HttplugPromise($promise, $cancel);
}

/**
* Resolve pending promises that complete before the timeouts are reached.
*
* When $maxDuration is null and $idleTimeout is reached, promises are rejected.
*
* @return int The number of remaining pending promises
*/
public function wait(float $maxDuration = null, float $idleTimeout = null): int
{
$pendingResponse = $this->pendingResponse;
$this->pendingResponse = null;

if (null !== $maxDuration) {
$startTime = microtime(true);
$idleTimeout = max(0.0, min($maxDuration / 5, $idleTimeout ?? $maxDuration));
$remainingDuration = $maxDuration;
}

do {
foreach ($this->client->stream($this->promisePool, $idleTimeout) as $response => $chunk) {
try {
if (null !== $maxDuration && $chunk->isTimeout()) {
goto check_duration;
}

if ($chunk->isFirst()) {
// Deactivate throwing on 3/4/5xx
$response->getStatusCode();
}

if (!$chunk->isLast()) {
goto check_duration;
}

if ([$request, $promise] = $this->promisePool[$response] ?? null) {
unset($this->promisePool[$response]);
$promise->resolve($this->createPsr7Response($response, true));
}
} catch (\Exception $e) {
if ([$request, $promise] = $this->promisePool[$response] ?? null) {
unset($this->promisePool[$response]);

if ($e instanceof TransportExceptionInterface) {
$e = new NetworkException($e->getMessage(), $request, $e);
}

$promise->reject($e);
}
}

if ($pendingResponse === $response) {
return \count($this->promisePool);
}

check_duration:
if (null !== $maxDuration && $idleTimeout && $idleTimeout > $remainingDuration = max(0.0, $maxDuration - microtime(true) + $startTime)) {
$idleTimeout = $remainingDuration / 5;
break;
}
}

if (!$count = \count($this->promisePool)) {
return 0;
}
} while (null !== $maxDuration && 0 < $remainingDuration);

return $count;
}

/**
* {@inheritdoc}
*/
public function createRequest($method, $uri, array $headers = [], $body = null, $protocolVersion = '1.1'): RequestInterface
{
$request = $this->client
->createRequest($method, $uri)
if ($this->responseFactory instanceof RequestFactoryInterface) {
$request = $this->responseFactory->createRequest($method, $uri);
} elseif (!class_exists(Request::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
} else {
$request = new Request($method, $uri);
}

$request = $request
->withProtocolVersion($protocolVersion)
->withBody($this->createStream($body))
;
Expand All @@ -100,27 +231,84 @@ public function createStream($body = null): StreamInterface
}

if (\is_string($body ?? '')) {
$body = $this->client->createStream($body ?? '');

if ($body->isSeekable()) {
$body->seek(0);
}

return $body;
$stream = $this->streamFactory->createStream($body ?? '');
} elseif (\is_resource($body)) {
$stream = $this->streamFactory->createStreamFromResource($body);
} else {
throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
}

if (\is_resource($body)) {
return $this->client->createStreamFromResource($body);
if ($stream->isSeekable()) {
$stream->seek(0);
}

throw new \InvalidArgumentException(sprintf('%s() expects string, resource or StreamInterface, %s given.', __METHOD__, \gettype($body)));
return $stream;
}

/**
* {@inheritdoc}
*/
public function createUri($uri = ''): UriInterface
public function createUri($uri): UriInterface
{
return $uri instanceof UriInterface ? $uri : $this->client->createUri($uri);
if ($uri instanceof UriInterface) {
return $uri;
}

if ($this->responseFactory instanceof UriFactoryInterface) {
return $this->responseFactory->createUri($uri);
}

if (!class_exists(Uri::class)) {
throw new \LogicException(sprintf('You cannot use "%s()" as the "nyholm/psr7" package is not installed. Try running "composer require nyholm/psr7".', __METHOD__));
}

return new Uri($uri);
}

private function sendPsr7Request(RequestInterface $request, bool $buffer = null): ResponseInterface
{
try {
$body = $request->getBody();

if ($body->isSeekable()) {
$body->seek(0);
}

return $this->client->request($request->getMethod(), (string) $request->getUri(), [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
'http_version' => '1.0' === $request->getProtocolVersion() ? '1.0' : null,
'buffer' => $buffer,
]);
} catch (\InvalidArgumentException $e) {
throw new RequestException($e->getMessage(), $request, $e);
} catch (TransportExceptionInterface $e) {
throw new NetworkException($e->getMessage(), $request, $e);
}
}

private function createPsr7Response(ResponseInterface $response, bool $buffer = false): Psr7ResponseInterface
{
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());

foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
}
}

if (isset(class_uses($response)[ResponseTrait::class])) {
$body = $this->streamFactory->createStreamFromResource($response->toStream(false));
} elseif (!$buffer) {
$body = $this->streamFactory->createStreamFromResource(StreamWrapper::createResource($response, $this->client));
} else {
$body = $this->streamFactory->createStream($response->getContent(false));
}

if ($body->isSeekable()) {
$body->seek(0);
}

return $psrResponse->withBody($body);
}
}