Skip to content

Commit

Permalink
[HttpClient] Add portable HTTP/2 implementation based on Amp's HTTP c…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
nicolas-grekas committed Dec 30, 2019
1 parent 392d0b0 commit 8b4f81a
Show file tree
Hide file tree
Showing 9 changed files with 788 additions and 11 deletions.
1 change: 1 addition & 0 deletions composer.json
Expand Up @@ -99,6 +99,7 @@
"symfony/yaml": "self.version"
},
"require-dev": {
"amphp/http-client": "^4.0",
"cache/integration-tests": "dev-master",
"doctrine/annotations": "~1.0",
"doctrine/cache": "~1.6",
Expand Down
154 changes: 154 additions & 0 deletions src/Symfony/Component/HttpClient/AmpHttpClient.php
@@ -0,0 +1,154 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient;

use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Component\HttpClient\Internal\AmpClientState;
use Symfony\Component\HttpClient\Response\AmpResponse;
use Symfony\Component\HttpClient\Response\ResponseStream;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
use Symfony\Contracts\HttpClient\ResponseStreamInterface;
use Symfony\Contracts\Service\ResetInterface;

if (!class_exists(HttpClientBuilder::class)) {
throw new \LogicException('You cannot use "Symfony\Component\HttpClient\AmpHttpClient" as the "amphp/http-client" package is not installed. Try running "composer require amphp/http-client".');
}

/**
* A portable implementation of the HttpClientInterface contracts based on Amp's HTTP client.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
final class AmpHttpClient implements HttpClientInterface, LoggerAwareInterface, ResetInterface
{
use HttpClientTrait;
use LoggerAwareTrait;

private $defaultOptions = self::OPTIONS_DEFAULTS;

/** @var AmpClientState */
private $multi;

/**
* @param array $defaultOptions Default requests' options
* @param int $maxHostConnections The maximum number of connections to open
*
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
*/
public function __construct(array $defaultOptions = [], int $maxHostConnections = 6, HttpClientBuilder $builder = null)
{
$this->defaultOptions['buffer'] = $this->defaultOptions['buffer'] ?? \Closure::fromCallable([__CLASS__, 'shouldBuffer']);

if ($defaultOptions) {
[, $this->defaultOptions] = self::prepareRequest(null, null, $defaultOptions, $this->defaultOptions);
}

$this->multi = new AmpClientState($builder, $maxHostConnections);
}

/**
* @see HttpClientInterface::OPTIONS_DEFAULTS for available options
*
* {@inheritdoc}
*/
public function request(string $method, string $url, array $options = []): ResponseInterface
{
[$url, $options] = self::prepareRequest($method, $url, $options, $this->defaultOptions);

// TODO: handle all options

// TODO: stream the body upload when possible
$options['body'] = self::getBodyAsString($options['body']);

if ('' !== $options['body'] && 'POST' === $method && !isset($options['normalized_headers']['content-type'])) {
$options['headers'][] = 'Content-Type: application/x-www-form-urlencoded';
}

$this->logger && $this->logger->info(sprintf('Request: %s %s', $method, implode('', $url)));

if (!isset($options['normalized_headers']['user-agent'])) {
$options['headers'][] = 'User-Agent: Symfony HttpClient/Amp';
}

if (0 < $options['max_duration']) {
$options['timeout'] = min($options['max_duration'], $options['timeout']);
}

$request = new Request(implode('', $url), $method);

if ($options['http_version']) {
switch ((float) $options['http_version']) {
case 1.0: $request->setProtocolVersions(['1.0']); break;
case 1.1: $request->setProtocolVersions(['1.1', '1.0']); break;
default: $request->setProtocolVersions(['2', '1.1', '1.0']); break;
}
}

foreach ($options['headers'] as $v) {
$h = explode(': ', $v, 2);
$request->addHeader($h[0], $h[1]);
}

$request->setTcpConnectTimeout(1000 * $options['timeout']);
$request->setTlsHandshakeTimeout(1000 * $options['timeout']);
$request->setTransferTimeout(1000 * $options['max_duration']);
$request->setBody($options['body'] ?? '');

return new AmpResponse($this->multi, $request, $options, $this->logger);
}

/**
* {@inheritdoc}
*/
public function stream($responses, float $timeout = null): ResponseStreamInterface
{
if ($responses instanceof AmpResponse) {
$responses = [$responses];
} elseif (!is_iterable($responses)) {
throw new \TypeError(sprintf('%s() expects parameter 1 to be an iterable of AmpResponse objects, %s given.', __METHOD__, \is_object($responses) ? \get_class($responses) : \gettype($responses)));
}

return new ResponseStream(AmpResponse::stream($responses, $timeout));
}

public function reset()
{
}

private static function getBodyAsString($body): string
{
if (\is_resource($body)) {
return stream_get_contents($body);
}

if (!$body instanceof \Closure) {
return $body;
}

$result = '';

while ('' !== $data = $body(self::$CHUNK_SIZE)) {
if (!\is_string($data)) {
throw new TransportException(sprintf('Return value of the "body" option callback must be string, %s returned.', \gettype($data)));
}

$result .= $data;
}

return $result;
}
}
5 changes: 5 additions & 0 deletions src/Symfony/Component/HttpClient/CHANGELOG.md
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.1.0
-----

* added `AmpHttpClient`, a portable HTTP/2 implementation based on Amp

4.4.0
-----

Expand Down
91 changes: 91 additions & 0 deletions src/Symfony/Component/HttpClient/Internal/AmpClientState.php
@@ -0,0 +1,91 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\HttpClient\Internal;

use Amp\Http\Client\Connection\DefaultConnectionFactory;
use Amp\Http\Client\Connection\LimitedConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Socket\Certificate;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\DnsConnector;
use Amp\Socket\StaticConnector;
use Amp\Sync\LocalKeyedSemaphore;

/**
* Internal representation of the Amp client's state.
*
* @author Nicolas Grekas <p@tchwork.com>
*
* @internal
*/
final class AmpClientState extends ClientState
{
/** @var HttpClientBuilder[] */
private $clients = [];
private $maxHostConnections = 0;
private $builder;

public function __construct(?HttpClientBuilder $builder, int $maxHostConnections)
{
$this->builder = ($builder ?? (new HttpClientBuilder())->allowDeprecatedUriUserInfo())->followRedirects(0);
$this->maxHostConnections = $maxHostConnections;
}

public function getClient(array $options): HttpClient
{
$options = [
'bindto' => $options['bindto'] ?: '0',
'verify_peer' => $options['verify_peer'],
// verify_host?
'capath' => $options['capath'],
'cafile' => $options['cafile'],
'local_cert' => $options['local_cert'],
'local_pk' => $options['local_pk'],
// passphrase?
// peer_fingerprint?
'ciphers' => $options['ciphers'],
'capture_peer_cert_chain' => $options['capture_peer_cert_chain'] || $options['peer_fingerprint'],
];

$key = implode("\0", $options);

if (isset($this->clients[$key])) {
return $this->clients[$key];
}

$context = new ClientTlsContext('');
$options['verify_peer'] || $context = $context->withoutPeerVerification();
$options['cafile'] && $context = $context->withCaFile($options['cafile']);
$options['capath'] && $context = $context->withCaPath($options['capath']);
$options['local_cert'] && $context = $context->withCertificate(new Certificate($options['local_cert'], $options['local_pk']));
$options['ciphers'] && $context = $context->withCiphers($options['ciphers']);
$options['capture_peer_cert_chain'] && $context = $context->withPeerCapturing();

if ($options['bindto']) {
$connector = (file_exists($options['bindto']) ? 'unix://' : 'tcp://').$options['bindto'];
$connector = new StaticConnector($connector, new DnsConnector());
} else {
$connector = null;
}

$pool = new UnlimitedConnectionPool(new DefaultConnectionFactory($connector, (new ConnectContext())->withTlsContext($context)));

if (0 < $this->maxHostConnections) {
$pool = LimitedConnectionPool::byHost($pool, new LocalKeyedSemaphore($this->maxHostConnections));
}

return $this->clients[$key] = $this->builder->usingPool($pool)->build();
}
}

0 comments on commit 8b4f81a

Please sign in to comment.